Skip to content

Commit

Permalink
chore: Re-add transform definitions (vectordotdev#17152)
Browse files Browse the repository at this point in the history
* Revert "Revert "enhancement(topology): Update transforms to handle multiple definitions (vectordotdev#16793)""

This reverts commit 5dc20f3.

* Revert "Revert "chore(topology): split `build_pieces` into smaller functions (vectordotdev#17037)""

This reverts commit 0e11bc3.

* Revert "Revert "chore(topology): Transform outputs hash table of OutputId -> Definition (vectordotdev#17059)""

This reverts commit 8916ec1.

---------

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely authored Apr 27, 2023
1 parent 27c3526 commit 9031d0f
Show file tree
Hide file tree
Showing 87 changed files with 2,714 additions and 2,088 deletions.
20 changes: 13 additions & 7 deletions benches/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc};
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use indexmap::IndexMap;
use vector::{
config::{DataType, Output},
config::{DataType, TransformOutput},
event::{Event, LogEvent, Value},
transforms::{
remap::{Remap, RemapConfig},
Expand All @@ -27,8 +27,10 @@ fn benchmark_remap(c: &mut Criterion) {
let mut group = c.benchmark_group("remap");

let add_fields_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
let mut outputs =
TransformOutputsBuf::new_with_capacity(vec![Output::default(DataType::all())], 1);
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), HashMap::new())],
1,
);
tform.transform(event, &mut outputs);
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();
Expand Down Expand Up @@ -77,8 +79,10 @@ fn benchmark_remap(c: &mut Criterion) {
});

let json_parser_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
let mut outputs =
TransformOutputsBuf::new_with_capacity(vec![Output::default(DataType::all())], 1);
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), HashMap::new())],
1,
);
tform.transform(event, &mut outputs);
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();
Expand Down Expand Up @@ -129,8 +133,10 @@ fn benchmark_remap(c: &mut Criterion) {

let coerce_runner =
|tform: &mut Box<dyn SyncTransform>, event: Event, timestamp: DateTime<Utc>| {
let mut outputs =
TransformOutputsBuf::new_with_capacity(vec![Output::default(DataType::all())], 1);
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), HashMap::new())],
1,
);
tform.transform(event, &mut outputs);
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();
Expand Down
6 changes: 3 additions & 3 deletions benches/transform/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use vector::transforms::{
TransformOutputsBuf,
};
use vector_core::{
config::{DataType, Output},
config::{DataType, TransformOutput},
event::{Event, EventContainer, EventMetadata, LogEvent},
transform::{SyncTransform, TransformContext},
};
Expand Down Expand Up @@ -54,10 +54,10 @@ fn route(c: &mut Criterion) {
"bba", "bbca", "dba", "bea", "fba", "gba", "hba", "iba", "jba", "bka", "bal", "bma", "bna",
"boa", "bpa", "bqa", "bra", "bsa", "bta", "bua", "bva", "bwa", "xba", "aby", "zba",
] {
outputs.push(Output {
outputs.push(TransformOutput {
port: Some(String::from(name)),
ty: DataType::Log,
log_schema_definition: None,
log_schema_definitions: Vec::new(),
});
}
let output_buffer: TransformOutputsBuf = TransformOutputsBuf::new_with_capacity(outputs, 10);
Expand Down
236 changes: 216 additions & 20 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::{fmt, num::NonZeroUsize};
use std::{collections::HashMap, fmt, num::NonZeroUsize};

use bitmask_enum::bitmask;
use bytes::Bytes;
use chrono::{DateTime, Utc};

mod global_options;
mod log_schema;
pub mod output_id;
pub mod proxy;

use crate::event::LogEvent;
pub use global_options::GlobalOptions;
pub use log_schema::{init_log_schema, log_schema, LogSchema};
use lookup::{lookup_v2::ValuePath, path, PathPrefix};
pub use output_id::OutputId;
use serde::{Deserialize, Serialize};
use value::Value;
pub use vector_common::config::ComponentKey;
Expand Down Expand Up @@ -100,42 +102,119 @@ impl Input {
}

#[derive(Debug, Clone, PartialEq)]
pub struct Output {
pub struct SourceOutput {
pub port: Option<String>,
pub ty: DataType,

// NOTE: schema definitions are only implemented/supported for log-type events. There is no
// inherent blocker to support other types as well, but it'll require additional work to add
// the relevant schemas, and store them separately in this type.
pub schema_definition: Option<schema::Definition>,
}

impl SourceOutput {
/// Create a `SourceOutput` of the given data type that contains a single output `Definition`.
/// Designed for use in log sources.
///
/// The `None` variant of a schema definition has two distinct meanings for a source component
/// versus a transform component:
///
/// For *sources*, a `None` schema is identical to a `Some(Definition::source_default())`.
/// # Panics
///
/// For a *transform*, a schema [`schema::Definition`] is required if `Datatype` is Log.
pub log_schema_definition: Option<schema::Definition>,
}
/// Panics if `ty` does not contain [`DataType::Log`].
#[must_use]
pub fn new_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
assert!(ty.contains(DataType::Log));

impl Output {
/// Create a default `Output` of the given data type.
///
/// A default output is one without a port identifier (i.e. not a named output) and the default
/// output consumers will receive if they declare the component itself as an input.
pub fn default(ty: DataType) -> Self {
Self {
port: None,
ty,
log_schema_definition: None,
schema_definition: Some(schema_definition),
}
}

/// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
/// Designed for use in metrics sources.
///
/// Sets the datatype to be [`DataType::Metric`].
#[must_use]
pub fn new_metrics() -> Self {
Self {
port: None,
ty: DataType::Metric,
schema_definition: None,
}
}

/// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
/// Designed for use in trace sources.
///
/// Sets the datatype to be [`DataType::Trace`].
#[must_use]
pub fn new_traces() -> Self {
Self {
port: None,
ty: DataType::Trace,
schema_definition: None,
}
}

/// Set the schema definition for this `Output`.
/// Return the schema [`schema::Definition`] from this output.
///
/// Takes a `schema_enabled` flag to determine if the full definition including the fields
/// and associated types should be returned, or if a simple definition should be returned.
/// A simple definition is just the default for the namespace. For the Vector namespace the
/// meanings are included.
/// Schema enabled is set in the users configuration.
#[must_use]
pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
self.schema_definition.as_ref().map(|definition| {
if schema_enabled {
definition.clone()
} else {
let mut new_definition =
schema::Definition::default_for_namespace(definition.log_namespaces());

if definition.log_namespaces().contains(&LogNamespace::Vector) {
new_definition.add_meanings(definition.meanings());
}

new_definition
}
})
}
}

impl SourceOutput {
/// Set the port name for this `SourceOutput`.
#[must_use]
pub fn with_schema_definition(mut self, schema_definition: schema::Definition) -> Self {
self.log_schema_definition = Some(schema_definition);
pub fn with_port(mut self, name: impl Into<String>) -> Self {
self.port = Some(name.into());
self
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct TransformOutput {
pub port: Option<String>,
pub ty: DataType,

/// For *transforms* if `Datatype` is [`DataType::Log`], if schema is
/// enabled, at least one definition should be output. If the transform
/// has multiple connected sources, it is possible to have multiple output
/// definitions - one for each input.
pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
}

impl TransformOutput {
/// Create a `TransformOutput` of the given data type that contains multiple [`schema::Definition`]s.
/// Designed for use in transforms.
#[must_use]
pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
Self {
port: None,
ty,
log_schema_definitions: schema_definitions,
}
}

/// Set the port name for this `Output`.
#[must_use]
Expand All @@ -145,6 +224,18 @@ impl Output {
}
}

/// Simple utility function that can be used by transforms that make no changes to
/// the schema definitions of events.
/// Takes a list of definitions with [`OutputId`] returns them as a [`HashMap`].
pub fn clone_input_definitions(
input_definitions: &[(OutputId, schema::Definition)],
) -> HashMap<OutputId, schema::Definition> {
input_definitions
.iter()
.map(|(output, definition)| (output.clone(), definition.clone()))
.collect()
}

/// Source-specific end-to-end acknowledgements configuration.
///
/// This type exists solely to provide a source-specific description of the `acknowledgements`
Expand Down Expand Up @@ -427,10 +518,12 @@ impl LogNamespace {

#[cfg(test)]
mod test {
use crate::config::{init_log_schema, LogNamespace, LogSchema};
use super::*;
use crate::event::LogEvent;
use chrono::Utc;
use lookup::event_path;
use lookup::{event_path, owned_value_path, OwnedTargetPath};
use value::Kind;
use vector_common::btreemap;

#[test]
fn test_insert_standard_vector_source_metadata() {
Expand All @@ -446,4 +539,107 @@ mod test {

assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
}

#[test]
fn test_source_definitions_legacy() {
let definition = schema::Definition::empty_legacy_namespace()
.with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
.with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
let output = SourceOutput::new_logs(DataType::Log, definition);

let valid_event = LogEvent::from(Value::from(btreemap! {
"zork" => "norknoog",
"nork" => 32
}))
.into();

let invalid_event = LogEvent::from(Value::from(btreemap! {
"nork" => 32
}))
.into();

// Get a definition with schema enabled.
let new_definition = output.schema_definition(true).unwrap();

// Meanings should still exist.
assert_eq!(
Some(&OwnedTargetPath::event(owned_value_path!("zork"))),
new_definition.meaning_path("zork")
);

// Events should have the schema validated.
new_definition.assert_valid_for_event(&valid_event);
new_definition.assert_invalid_for_event(&invalid_event);

// There should be the default legacy definition without schemas enabled.
assert_eq!(
Some(schema::Definition::default_legacy_namespace()),
output.schema_definition(false)
);
}

#[test]
fn test_source_definitons_vector() {
let definition = schema::Definition::default_for_namespace(&[LogNamespace::Vector].into())
.with_metadata_field(
&owned_value_path!("vector", "zork"),
Kind::integer(),
Some("zork"),
)
.with_event_field(&owned_value_path!("nork"), Kind::integer(), None);

let output = SourceOutput::new_logs(DataType::Log, definition);

let mut valid_event = LogEvent::from(Value::from(btreemap! {
"nork" => 32
}));

valid_event
.metadata_mut()
.value_mut()
.insert(path!("vector").concat("zork"), 32);

let valid_event = valid_event.into();

let mut invalid_event = LogEvent::from(Value::from(btreemap! {
"nork" => 32
}));

invalid_event
.metadata_mut()
.value_mut()
.insert(path!("vector").concat("zork"), "noog");

let invalid_event = invalid_event.into();

// Get a definition with schema enabled.
let new_definition = output.schema_definition(true).unwrap();

// Meanings should still exist.
assert_eq!(
Some(&OwnedTargetPath::metadata(owned_value_path!(
"vector", "zork"
))),
new_definition.meaning_path("zork")
);

// Events should have the schema validated.
new_definition.assert_valid_for_event(&valid_event);
new_definition.assert_invalid_for_event(&invalid_event);

// Get a definition without schema enabled.
let new_definition = output.schema_definition(false).unwrap();

// Meanings should still exist.
assert_eq!(
Some(&OwnedTargetPath::metadata(owned_value_path!(
"vector", "zork"
))),
new_definition.meaning_path("zork")
);

// Events should not have the schema validated.
new_definition.assert_valid_for_event(&valid_event);
new_definition.assert_valid_for_event(&invalid_event);
}
}
Loading

0 comments on commit 9031d0f

Please sign in to comment.