Skip to content

Commit

Permalink
chore(schema): Example for adding log_namespace support to `demo_lo…
Browse files Browse the repository at this point in the history
…gs` source (#13720)
  • Loading branch information
fuchsnj authored Jul 27, 2022
1 parent 24ab156 commit b838a19
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 18 deletions.
7 changes: 6 additions & 1 deletion src/sinks/util/adaptive_concurrency/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,12 @@ async fn run_test(params: TestParams) -> TestResults {
let cstats = Arc::clone(&test_config.controller_stats);

let mut config = config::Config::builder();
let demo_logs = DemoLogsConfig::repeat(vec!["line 1".into()], params.requests, params.interval);
let demo_logs = DemoLogsConfig::repeat(
vec!["line 1".into()],
params.requests,
params.interval,
None,
);
config.add_source("in", demo_logs);
config.add_sink("out", &["in"], test_config);

Expand Down
59 changes: 43 additions & 16 deletions src/sources/demo_logs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::task::Poll;

use bytes::Bytes;
use chrono::Utc;
use codecs::{
decoding::{DeserializerConfig, FramingConfig},
Expand All @@ -10,6 +7,7 @@ use fakedata::logs::*;
use futures::StreamExt;
use rand::seq::SliceRandom;
use snafu::Snafu;
use std::task::Poll;
use tokio::time::{self, Duration};
use tokio_util::codec::FramedRead;
use vector_config::configurable_component;
Expand Down Expand Up @@ -55,6 +53,10 @@ pub struct DemoLogsConfig {
#[configurable(derived)]
#[derivative(Default(value = "default_decoding()"))]
pub decoding: DeserializerConfig,

/// The namespace to use for logs. This overrides the global setting
#[serde(default)]
pub log_namespace: Option<bool>,
}

const fn default_interval() -> f64 {
Expand Down Expand Up @@ -149,8 +151,13 @@ impl OutputFormat {
}

impl DemoLogsConfig {
#[allow(dead_code)] // to make check-component-features pass
pub fn repeat(lines: Vec<String>, count: usize, interval: f64) -> Self {
#[cfg(test)]
pub fn repeat(
lines: Vec<String>,
count: usize,
interval: f64,
log_namespace: Option<bool>,
) -> Self {
Self {
count,
interval,
Expand All @@ -160,6 +167,7 @@ impl DemoLogsConfig {
},
framing: default_framing_message_based(),
decoding: default_decoding(),
log_namespace,
}
}
}
Expand All @@ -171,6 +179,7 @@ async fn demo_logs_source(
decoder: Decoder,
mut shutdown: ShutdownSignal,
mut out: SourceSender,
log_namespace: LogNamespace,
) -> Result<(), ()> {
let maybe_interval: Option<f64> = (interval != 0.0).then(|| interval);

Expand Down Expand Up @@ -204,9 +213,18 @@ async fn demo_logs_source(

let events = events.into_iter().map(|mut event| {
let log = event.as_mut_log();

log.try_insert(log_schema().source_type_key(), Bytes::from("demo_logs"));
log.try_insert(log_schema().timestamp_key(), now);
log_namespace.insert_vector_metadata(
log,
log_schema().source_type_key(),
"source_type",
"demo_logs",
);
log_namespace.insert_vector_metadata(
log,
log_schema().timestamp_key(),
"ingest_timestamp",
now,
);

event
});
Expand Down Expand Up @@ -242,25 +260,33 @@ impl_generate_config_from_default!(DemoLogsConfig);
#[typetag::serde(name = "demo_logs")]
impl SourceConfig for DemoLogsConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);

self.format.validate()?;
let decoder = DecodingConfig::new(
self.framing.clone(),
self.decoding.clone(),
LogNamespace::Legacy,
)
.build();
let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
Ok(Box::pin(demo_logs_source(
self.interval,
self.count,
self.format.clone(),
decoder,
cx.shutdown,
cx.out,
log_namespace,
)))
}

fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<Output> {
vec![Output::default(self.decoding.output_type())]
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<Output> {
// There is a global and per-source `log_namespace` config. The source config overrides the global setting,
// and is merged here.
let log_namespace = global_log_namespace.merge(self.log_namespace);

let schema_definition = self
.decoding
.schema_definition(log_namespace)
.with_standard_vector_source_metadata();

vec![Output::default(self.decoding.output_type()).with_schema_definition(schema_definition)]
}

fn source_type(&self) -> &'static str {
Expand Down Expand Up @@ -330,6 +356,7 @@ mod tests {
decoder,
ShutdownSignal::noop(),
tx,
LogNamespace::Legacy,
)
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/topology/test/source_finished.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
#[tokio::test]
async fn sources_finished() {
let mut old_config = Config::builder();
let demo_logs = DemoLogsConfig::repeat(vec!["text".to_owned()], 1, 0.0);
let demo_logs = DemoLogsConfig::repeat(vec!["text".to_owned()], 1, 0.0, None);
old_config.add_source("in", demo_logs);
old_config.add_sink(
"out",
Expand Down

0 comments on commit b838a19

Please sign in to comment.