Skip to content

Commit

Permalink
chore(topology): Transform outputs hash table of OutputId -> Definiti…
Browse files Browse the repository at this point in the history
…on (vectordotdev#17059)

Transform outputs hash table of OutputId -> Definition

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely authored Apr 11, 2023
1 parent 887d6d7 commit 1bdb24d
Show file tree
Hide file tree
Showing 24 changed files with 283 additions and 212 deletions.
6 changes: 3 additions & 3 deletions benches/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn benchmark_remap(c: &mut Criterion) {

let add_fields_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), vec![])],
vec![TransformOutput::new(DataType::all(), HashMap::new())],
1,
);
tform.transform(event, &mut outputs);
Expand Down Expand Up @@ -80,7 +80,7 @@ fn benchmark_remap(c: &mut Criterion) {

let json_parser_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), vec![])],
vec![TransformOutput::new(DataType::all(), HashMap::new())],
1,
);
tform.transform(event, &mut outputs);
Expand Down Expand Up @@ -134,7 +134,7 @@ fn benchmark_remap(c: &mut Criterion) {
let coerce_runner =
|tform: &mut Box<dyn SyncTransform>, event: Event, timestamp: DateTime<Utc>| {
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), vec![])],
vec![TransformOutput::new(DataType::all(), HashMap::new())],
1,
);
tform.transform(event, &mut outputs);
Expand Down
20 changes: 17 additions & 3 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::{fmt, num::NonZeroUsize};
use std::{collections::HashMap, fmt, num::NonZeroUsize};

use bitmask_enum::bitmask;
use bytes::Bytes;
use chrono::{DateTime, Utc};

mod global_options;
mod log_schema;
pub mod output_id;
pub mod proxy;

use crate::event::LogEvent;
pub use global_options::GlobalOptions;
pub use log_schema::{init_log_schema, log_schema, LogSchema};
use lookup::{lookup_v2::ValuePath, path, PathPrefix};
pub use output_id::OutputId;
use serde::{Deserialize, Serialize};
use value::Value;
pub use vector_common::config::ComponentKey;
Expand Down Expand Up @@ -199,14 +201,14 @@ pub struct TransformOutput {
/// enabled, at least one definition should be output. If the transform
/// has multiple connected sources, it is possible to have multiple output
/// definitions - one for each input.
pub log_schema_definitions: Vec<schema::Definition>,
pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
}

impl TransformOutput {
/// Create a `TransformOutput` of the given data type that contains multiple [`schema::Definition`]s.
/// Designed for use in transforms.
#[must_use]
pub fn new(ty: DataType, schema_definitions: Vec<schema::Definition>) -> Self {
pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
Self {
port: None,
ty,
Expand All @@ -222,6 +224,18 @@ impl TransformOutput {
}
}

/// Simple utility function that can be used by transforms that make no changes to
/// the schema definitions of events.
/// Takes a list of definitions with [`OutputId`] returns them as a [`HashMap`].
pub fn clone_input_definitions(
input_definitions: &[(OutputId, schema::Definition)],
) -> HashMap<OutputId, schema::Definition> {
input_definitions
.iter()
.map(|(output, definition)| (output.clone(), definition.clone()))
.collect()
}

/// Source-specific end-to-end acknowledgements configuration.
///
/// This type exists solely to provide a source-specific description of the `acknowledgements`
Expand Down
90 changes: 90 additions & 0 deletions lib/vector-core/src/config/output_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::fmt;

use vector_common::config::ComponentKey;

use crate::{config::configurable_component, schema};

/// Component output identifier.
#[configurable_component]
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct OutputId {
/// The component to which the output belongs.
pub component: ComponentKey,

/// The output port name, if not the default.
pub port: Option<String>,
}

impl OutputId {
/// Some situations, for example when validating a config file requires running the
/// `transforms::output` function to retrieve the outputs, but we don't have an
/// `OutputId` from a source. This gives us an `OutputId` that we can use.
///
/// TODO: This is not a pleasant solution, but would require some significant refactoring
/// to the topology code to avoid.
pub fn dummy() -> Self {
Self {
component: "dummy".into(),
port: None,
}
}

/// Given a list of [`schema::Definition`]s, returns a [`Vec`] of tuples of
/// this `OutputId` with each `Definition`.
pub fn with_definitions(
&self,
definitions: impl IntoIterator<Item = schema::Definition>,
) -> Vec<(OutputId, schema::Definition)> {
definitions
.into_iter()
.map(|definition| (self.clone(), definition))
.collect()
}
}

impl fmt::Display for OutputId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.port {
None => self.component.fmt(f),
Some(port) => write!(f, "{}.{port}", self.component),
}
}
}

impl From<ComponentKey> for OutputId {
fn from(key: ComponentKey) -> Self {
Self {
component: key,
port: None,
}
}
}

impl From<&ComponentKey> for OutputId {
fn from(key: &ComponentKey) -> Self {
Self::from(key.clone())
}
}

impl From<(&ComponentKey, String)> for OutputId {
fn from((key, name): (&ComponentKey, String)) -> Self {
Self {
component: key.clone(),
port: Some(name),
}
}
}

// This panicking implementation is convenient for testing, but should never be enabled for use
// outside of tests.
#[cfg(any(test, feature = "test"))]
impl From<&str> for OutputId {
fn from(s: &str) -> Self {
assert!(
!s.contains('.'),
"Cannot convert dotted paths to strings without more context"
);
let component = ComponentKey::from(s);
component.into()
}
}
17 changes: 10 additions & 7 deletions src/config/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ mod test {
in_ty,
outputs: vec![TransformOutput::new(
out_ty,
vec![Definition::default_legacy_namespace()],
[("test".into(), Definition::default_legacy_namespace())].into(),
)],
},
);
Expand All @@ -415,8 +415,11 @@ mod test {
let id = id.into();
match self.nodes.get_mut(&id) {
Some(Node::Transform { outputs, .. }) => outputs.push(
TransformOutput::new(ty, vec![Definition::default_legacy_namespace()])
.with_port(name),
TransformOutput::new(
ty,
[("test".into(), Definition::default_legacy_namespace())].into(),
)
.with_port(name),
),
_ => panic!("invalid transform"),
}
Expand Down Expand Up @@ -651,11 +654,11 @@ mod test {
outputs: vec![
TransformOutput::new(
DataType::all(),
vec![Definition::default_legacy_namespace()],
[("test".into(), Definition::default_legacy_namespace())].into(),
),
TransformOutput::new(
DataType::all(),
vec![Definition::default_legacy_namespace()],
[("test".into(), Definition::default_legacy_namespace())].into(),
)
.with_port("bar"),
],
Expand All @@ -676,11 +679,11 @@ mod test {
outputs: vec![
TransformOutput::new(
DataType::all(),
vec![Definition::default_legacy_namespace()],
[("test".into(), Definition::default_legacy_namespace())].into(),
),
TransformOutput::new(
DataType::all(),
vec![Definition::default_legacy_namespace()],
[("test".into(), Definition::default_legacy_namespace())].into(),
)
.with_port("errors"),
],
Expand Down
89 changes: 1 addition & 88 deletions src/config/id.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::{fmt, ops::Deref};
use std::ops::Deref;

use vector_config::configurable_component;
pub use vector_core::config::ComponentKey;

use super::schema;

/// A list of upstream [source][sources] or [transform][transforms] IDs.
///
/// Wildcards (`*`) are supported.
Expand Down Expand Up @@ -96,88 +94,3 @@ impl<T> From<Vec<T>> for Inputs<T> {
Self(inputs)
}
}

/// Component output identifier.
#[configurable_component]
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct OutputId {
/// The component to which the output belongs.
pub component: ComponentKey,

/// The output port name, if not the default.
pub port: Option<String>,
}

impl OutputId {
/// Some situations, for example when validating a config file requires running the
/// transforms::output function to retrieve the outputs, but we don't have an
/// `OutputId` from a source. This gives us an `OutputId` that we can use.
///
/// TODO: This is not a pleasant solution, but would require some significant refactoring
/// to the topology code to avoid.
pub fn dummy() -> Self {
Self {
component: "dummy".into(),
port: None,
}
}

/// Given a list of [`schema::Definition`]s, returns a [`Vec`] of tuples of
/// this `OutputId` with each `Definition`.
pub fn with_definitions(
&self,
definitions: impl IntoIterator<Item = schema::Definition>,
) -> Vec<(OutputId, schema::Definition)> {
definitions
.into_iter()
.map(|definition| (self.clone(), definition))
.collect()
}
}

impl fmt::Display for OutputId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.port {
None => self.component.fmt(f),
Some(port) => write!(f, "{}.{}", self.component, port),
}
}
}

impl From<ComponentKey> for OutputId {
fn from(key: ComponentKey) -> Self {
Self {
component: key,
port: None,
}
}
}

impl From<&ComponentKey> for OutputId {
fn from(key: &ComponentKey) -> Self {
Self::from(key.clone())
}
}

impl From<(&ComponentKey, String)> for OutputId {
fn from((key, name): (&ComponentKey, String)) -> Self {
Self {
component: key.clone(),
port: Some(name),
}
}
}

// This panicking implementation is convenient for testing, but should never be enabled for use
// outside of tests.
#[cfg(test)]
impl From<&str> for OutputId {
fn from(s: &str) -> Self {
assert!(
!s.contains('.'),
"Cannot convert dotted paths to strings without more context"
);
let component = ComponentKey::from(s);
component.into()
}
}
6 changes: 4 additions & 2 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub use cmd::{cmd, Opts};
pub use diff::ConfigDiff;
pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
pub use format::{Format, FormatHint};
pub use id::{ComponentKey, Inputs, OutputId};
pub use id::{ComponentKey, Inputs};
pub use loading::{
load, load_builder_from_paths, load_from_paths, load_from_paths_with_provider_and_secrets,
load_from_str, load_source_from_paths, merge_path_lists, process_paths, CONFIG_PATHS,
Expand All @@ -57,7 +57,9 @@ pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
pub use transform::{BoxedTransform, TransformConfig, TransformContext, TransformOuter};
pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult};
pub use validation::warnings;
pub use vector_core::config::{init_log_schema, log_schema, proxy::ProxyConfig, LogSchema};
pub use vector_core::config::{
init_log_schema, log_schema, proxy::ProxyConfig, LogSchema, OutputId,
};

#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
pub enum ConfigPath {
Expand Down
8 changes: 5 additions & 3 deletions src/config/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub struct TransformContext {
///
/// Given a transform can expose multiple [`TransformOutput`] channels, the ID is tied to the identifier of
/// that `TransformOutput`.
pub schema_definitions: HashMap<Option<String>, Vec<schema::Definition>>,
pub schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,

/// The schema definition created by merging all inputs of the transform.
///
Expand All @@ -129,7 +129,7 @@ impl Default for TransformContext {
key: Default::default(),
globals: Default::default(),
enrichment_tables: Default::default(),
schema_definitions: HashMap::from([(None, vec![schema::Definition::any()])]),
schema_definitions: HashMap::from([(None, HashMap::new())]),
merged_schema_definition: schema::Definition::any(),
schema: SchemaOptions::default(),
}
Expand All @@ -148,7 +148,9 @@ impl TransformContext {
}

#[cfg(any(test, feature = "test"))]
pub fn new_test(schema_definitions: HashMap<Option<String>, Vec<schema::Definition>>) -> Self {
pub fn new_test(
schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
) -> Self {
Self {
schema_definitions,
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion src/test_util/mock/transforms/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl TransformConfig for BasicTransformConfig {
DataType::all(),
definitions
.iter()
.map(|(_output, definition)| definition.clone())
.map(|(output, definition)| (output.clone(), definition.clone()))
.collect(),
)]
}
Expand Down
2 changes: 1 addition & 1 deletion src/test_util/mock/transforms/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl TransformConfig for NoopTransformConfig {
DataType::all(),
definitions
.iter()
.map(|(_output, definition)| definition.clone())
.map(|(output, definition)| (output.clone(), definition.clone()))
.collect(),
)]
}
Expand Down
Loading

0 comments on commit 1bdb24d

Please sign in to comment.