diff --git a/src/sinks/util/adaptive_concurrency/tests.rs b/src/sinks/util/adaptive_concurrency/tests.rs index e2d24b7632d8c..18e61b730dd88 100644 --- a/src/sinks/util/adaptive_concurrency/tests.rs +++ b/src/sinks/util/adaptive_concurrency/tests.rs @@ -416,7 +416,12 @@ async fn run_test(params: TestParams) -> TestResults { let cstats = Arc::clone(&test_config.controller_stats); let mut config = config::Config::builder(); - let demo_logs = DemoLogsConfig::repeat(vec!["line 1".into()], params.requests, params.interval); + let demo_logs = DemoLogsConfig::repeat( + vec!["line 1".into()], + params.requests, + params.interval, + None, + ); config.add_source("in", demo_logs); config.add_sink("out", &["in"], test_config); diff --git a/src/sources/demo_logs.rs b/src/sources/demo_logs.rs index 99d510ca99c71..9119b32c40499 100644 --- a/src/sources/demo_logs.rs +++ b/src/sources/demo_logs.rs @@ -1,6 +1,3 @@ -use std::task::Poll; - -use bytes::Bytes; use chrono::Utc; use codecs::{ decoding::{DeserializerConfig, FramingConfig}, @@ -10,6 +7,7 @@ use fakedata::logs::*; use futures::StreamExt; use rand::seq::SliceRandom; use snafu::Snafu; +use std::task::Poll; use tokio::time::{self, Duration}; use tokio_util::codec::FramedRead; use vector_config::configurable_component; @@ -55,6 +53,10 @@ pub struct DemoLogsConfig { #[configurable(derived)] #[derivative(Default(value = "default_decoding()"))] pub decoding: DeserializerConfig, + + /// The namespace to use for logs. This overrides the global setting + #[serde(default)] + pub log_namespace: Option, } const fn default_interval() -> f64 { @@ -149,8 +151,13 @@ impl OutputFormat { } impl DemoLogsConfig { - #[allow(dead_code)] // to make check-component-features pass - pub fn repeat(lines: Vec, count: usize, interval: f64) -> Self { + #[cfg(test)] + pub fn repeat( + lines: Vec, + count: usize, + interval: f64, + log_namespace: Option, + ) -> Self { Self { count, interval, @@ -160,6 +167,7 @@ impl DemoLogsConfig { }, framing: default_framing_message_based(), decoding: default_decoding(), + log_namespace, } } } @@ -171,6 +179,7 @@ async fn demo_logs_source( decoder: Decoder, mut shutdown: ShutdownSignal, mut out: SourceSender, + log_namespace: LogNamespace, ) -> Result<(), ()> { let maybe_interval: Option = (interval != 0.0).then(|| interval); @@ -204,9 +213,18 @@ async fn demo_logs_source( let events = events.into_iter().map(|mut event| { let log = event.as_mut_log(); - - log.try_insert(log_schema().source_type_key(), Bytes::from("demo_logs")); - log.try_insert(log_schema().timestamp_key(), now); + log_namespace.insert_vector_metadata( + log, + log_schema().source_type_key(), + "source_type", + "demo_logs", + ); + log_namespace.insert_vector_metadata( + log, + log_schema().timestamp_key(), + "ingest_timestamp", + now, + ); event }); @@ -242,13 +260,11 @@ impl_generate_config_from_default!(DemoLogsConfig); #[typetag::serde(name = "demo_logs")] impl SourceConfig for DemoLogsConfig { async fn build(&self, cx: SourceContext) -> crate::Result { + let log_namespace = cx.log_namespace(self.log_namespace); + self.format.validate()?; - let decoder = DecodingConfig::new( - self.framing.clone(), - self.decoding.clone(), - LogNamespace::Legacy, - ) - .build(); + let decoder = + DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build(); Ok(Box::pin(demo_logs_source( self.interval, self.count, @@ -256,11 +272,21 @@ impl SourceConfig for DemoLogsConfig { decoder, cx.shutdown, cx.out, + log_namespace, ))) } - fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec { - vec![Output::default(self.decoding.output_type())] + fn outputs(&self, global_log_namespace: LogNamespace) -> Vec { + // There is a global and per-source `log_namespace` config. The source config overrides the global setting, + // and is merged here. + let log_namespace = global_log_namespace.merge(self.log_namespace); + + let schema_definition = self + .decoding + .schema_definition(log_namespace) + .with_standard_vector_source_metadata(); + + vec![Output::default(self.decoding.output_type()).with_schema_definition(schema_definition)] } fn source_type(&self) -> &'static str { @@ -330,6 +356,7 @@ mod tests { decoder, ShutdownSignal::noop(), tx, + LogNamespace::Legacy, ) .await .unwrap(); diff --git a/src/topology/test/source_finished.rs b/src/topology/test/source_finished.rs index 7f9083737a099..ac0391cf8832b 100644 --- a/src/topology/test/source_finished.rs +++ b/src/topology/test/source_finished.rs @@ -11,7 +11,7 @@ use crate::{ #[tokio::test] async fn sources_finished() { let mut old_config = Config::builder(); - let demo_logs = DemoLogsConfig::repeat(vec!["text".to_owned()], 1, 0.0); + let demo_logs = DemoLogsConfig::repeat(vec!["text".to_owned()], 1, 0.0, None); old_config.add_source("in", demo_logs); old_config.add_sink( "out",