Skip to content

Commit

Permalink
perf(tap): avoid compiling globs frequently during tap (vectordotdev#…
Browse files Browse the repository at this point in the history
…19255)

Glob compilation can be CPU intensive if topology changes
are happening. Attempts to avoid recompiling where practical.
  • Loading branch information
aholmberg authored and AndrooTheChen committed Sep 23, 2024
1 parent 32638b2 commit 67d2e47
Showing 1 changed file with 30 additions and 20 deletions.
50 changes: 30 additions & 20 deletions src/api/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,22 @@ impl GlobMatcher<&str> for String {
#[derive(Debug, Eq, PartialEq, Hash)]
enum Pattern {
/// A pattern used to tap into outputs of components
OutputPattern(String),
OutputPattern(glob::Pattern),
/// A pattern used to tap into inputs of components.
///
/// For a tap user, an input pattern is effectively a shortcut for specifying
/// one or more output patterns since a component's inputs are other
/// components' outputs. This variant captures the original user-supplied
/// pattern alongside the output patterns it's translated into.
InputPattern(String, Vec<String>),
InputPattern(String, Vec<glob::Pattern>),
}

impl GlobMatcher<&str> for Pattern {
fn matches_glob(&self, rhs: &str) -> bool {
match self {
Pattern::OutputPattern(pattern) => pattern.matches_glob(rhs),
Pattern::OutputPattern(pattern) => pattern.matches(rhs),
Pattern::InputPattern(_, patterns) => {
patterns.iter().any(|pattern| pattern.matches_glob(rhs))
patterns.iter().any(|pattern| pattern.matches(rhs))
}
}
}
Expand Down Expand Up @@ -289,18 +289,23 @@ async fn tap_handler(
}
});

let mut component_id_patterns = patterns.for_outputs.iter().cloned().map(Pattern::OutputPattern).collect::<HashSet<_>>();
let mut component_id_patterns = patterns.for_outputs.iter()
.filter_map(|p| glob::Pattern::new(p).ok())
.map(Pattern::OutputPattern).collect::<HashSet<_>>();

// Matching an input pattern is equivalent to matching the outputs of the component's inputs
for pattern in patterns.for_inputs.iter() {
match inputs.iter().filter(|(key, _)|
pattern.matches_glob(&key.to_string())
).flat_map(|(_, related_inputs)| related_inputs.iter().map(|id| id.to_string()).collect::<Vec<_>>()).collect::<HashSet<_>>() {
found if !found.is_empty() => {
component_id_patterns.insert(Pattern::InputPattern(pattern.clone(), found.into_iter().collect::<Vec<_>>()));
}
_ => {
debug!(message="Input pattern not expanded: no matching components.", ?pattern);
if let Ok(glob) = glob::Pattern::new(pattern) {
match inputs.iter().filter(|(key, _)|
glob.matches(&key.to_string())
).flat_map(|(_, related_inputs)| related_inputs.iter().map(|id| id.to_string()).collect::<Vec<_>>()).collect::<HashSet<_>>() {
found if !found.is_empty() => {
component_id_patterns.insert(Pattern::InputPattern(pattern.clone(), found.into_iter()
.filter_map(|p| glob::Pattern::new(&p).ok()).collect::<Vec<_>>()));
}
_ => {
debug!(message="Input pattern not expanded: no matching components.", ?pattern);
}
}
}
}
Expand Down Expand Up @@ -364,7 +369,7 @@ async fn tap_handler(

matched.extend(found.iter().map(|pattern| {
match pattern {
Pattern::OutputPattern(p) => p.to_owned(),
Pattern::OutputPattern(p) => p.to_string(),
Pattern::InputPattern(p, _) => p.to_owned(),
}
}));
Expand All @@ -391,16 +396,21 @@ async fn tap_handler(
}

// Warnings on invalid matches.

for pattern in patterns.for_inputs.iter() {
let invalid_matches = source_keys.iter().filter(|key| pattern.matches_glob(key)).cloned().collect::<Vec<_>>();
if !invalid_matches.is_empty() {
notifications.push(send_invalid_input_pattern_match(tx.clone(), pattern.clone(), invalid_matches).boxed())
if let Ok(glob) = glob::Pattern::new(pattern) {
let invalid_matches = source_keys.iter().filter(|key| glob.matches(key)).cloned().collect::<Vec<_>>();
if !invalid_matches.is_empty() {
notifications.push(send_invalid_input_pattern_match(tx.clone(), pattern.clone(), invalid_matches).boxed())
}
}
}
for pattern in patterns.for_outputs.iter() {
let invalid_matches = sink_keys.iter().filter(|key| pattern.matches_glob(key)).cloned().collect::<Vec<_>>();
if !invalid_matches.is_empty() {
notifications.push(send_invalid_output_pattern_match(tx.clone(), pattern.clone(), invalid_matches).boxed())
if let Ok(glob) = glob::Pattern::new(pattern) {
let invalid_matches = sink_keys.iter().filter(|key| glob.matches(key)).cloned().collect::<Vec<_>>();
if !invalid_matches.is_empty() {
notifications.push(send_invalid_output_pattern_match(tx.clone(), pattern.clone(), invalid_matches).boxed())
}
}
}

Expand Down

0 comments on commit 67d2e47

Please sign in to comment.