Skip to content

Commit

Permalink
chore(loki sink): warn on label expansions and collisions (vectordotd…
Browse files Browse the repository at this point in the history
  • Loading branch information
hargut authored Apr 14, 2023
1 parent 730c938 commit f06692b
Showing 1 changed file with 151 additions and 23 deletions.
174 changes: 151 additions & 23 deletions src/sinks/loki/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,35 +163,94 @@ pub(super) struct EventEncoder {

impl EventEncoder {
fn build_labels(&self, event: &Event) -> Vec<(String, String)> {
let mut static_vec: HashMap<String, String> = HashMap::new();
let mut dynamic_vec: HashMap<String, String> = HashMap::new();
let mut static_labels: HashMap<String, String> = HashMap::new();
let mut dynamic_labels: HashMap<String, String> = HashMap::new();

for (key_template, value_template) in self.labels.iter() {
if let (Ok(key), Ok(value)) = (
key_template.render_string(event),
value_template.render_string(event),
) {
if let Some(opening_prefix) = key.strip_suffix('*') {
let output: Result<serde_json::map::Map<String, serde_json::Value>, _> =
serde_json::from_str(value.as_str());

if let Ok(output) = output {
// key_* -> key_one, key_two, key_three
// * -> one, two, three
for (k, v) in output {
dynamic_vec.insert(
slugify_text(format!("{}{}", opening_prefix, k)),
Value::from(v).to_string_lossy().into_owned(),
);
}
let key = key_template.render_string(event);
let value = value_template.render_string(event);

if key.is_err() || value.is_err() {
if key.is_err() {
emit!(TemplateRenderingError {
field: Some(
format!(
"label_key \"{}\" with label_value \"{}\"",
key_template, value_template
)
.as_str()
),
drop_event: false,
error: key.err().unwrap(),
});
}
if value.is_err() {
emit!(TemplateRenderingError {
field: Some(
format!(
"label_value \"{}\" with label_key \"{}\"",
value_template, key_template
)
.as_str()
),
drop_event: false,
error: value.err().unwrap(),
});
}
continue;
}

let key_s = key.unwrap();
let value_s = value.unwrap();

if let Some(opening_prefix) = key_s.strip_suffix('*') {
let output: Result<
serde_json::map::Map<String, serde_json::Value>,
serde_json::Error,
> = serde_json::from_str(value_s.clone().as_str());

if output.is_err() {
warn!(
"Failed to expand dynamic label. value: {}, err: {}",
value_s,
output.err().unwrap()
);
continue;
}

// key_* -> key_one, key_two, key_three
// * -> one, two, three
for (k, v) in output.unwrap() {
let key = slugify_text(format!("{}{}", opening_prefix, k));
let val = Value::from(v).to_string_lossy().into_owned();
if val == "<null>" {
warn!("Encountered \"null\" value for dynamic label. key: {}", key);
continue;
}
} else {
static_vec.insert(key, value);
if let Some(prev) = dynamic_labels.insert(key.clone(), val.clone()) {
warn!(
"Encountered duplicated dynamic label. \
key: {}, value: {}, discarded value: {}",
key, val, prev
);
};
}
} else {
static_labels.insert(key_s, value_s);
}
}
dynamic_vec.extend(static_vec);
Vec::from_iter(dynamic_vec)

for (k, v) in static_labels {
if let Some(discarded_v) = dynamic_labels.insert(k.clone(), v.clone()) {
warn!(
"Static label overrides dynamic label. \
key: {}, value: {}, discarded value: {}",
k, v, discarded_v
);
};
}

Vec::from_iter(dynamic_labels)
}

fn remove_label_fields(&self, event: &mut Event) {
Expand Down Expand Up @@ -618,6 +677,75 @@ mod tests {
Ok(())
}

#[test]
fn encoder_with_colliding_dynamic_labels() -> Result<(), serde_json::Error> {
let mut labels = HashMap::default();
labels.insert(
Template::try_from("l1_*").unwrap(),
Template::try_from("{{ map1 }}").unwrap(),
);
labels.insert(
Template::try_from("*").unwrap(),
Template::try_from("{{ map2 }}").unwrap(),
);

let mut encoder = EventEncoder {
key_partitioner: KeyPartitioner::new(None),
transformer: Default::default(),
encoder: Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
labels,
remove_label_fields: false,
remove_timestamp: false,
};

let message = r###"
{
"map1": {
"key1": "val1"
},
"map2": {
"l1_key1": "val2"
}
}
"###;
let msg: BTreeMap<String, Value> = serde_json::from_str(message)?;
let event = Event::Log(LogEvent::from(msg));
let record = encoder.encode_event(event).unwrap();

assert_eq!(record.labels.len(), 1);
let labels: HashMap<String, String> = record.labels.into_iter().collect();
// EventEncoder.labels is type HashMap (unordered) -> both values can be valid
assert!(vec!["val1".to_string(), "val2".to_string()].contains(&labels["l1_key1"]));
Ok(())
}

#[test]
fn encoder_with_failing_dynamic_label_expansion() -> Result<(), serde_json::Error> {
let mut labels = HashMap::default();
labels.insert(
Template::try_from("missing_*").unwrap(),
Template::try_from("{{ map }}").unwrap(),
);

let mut encoder = EventEncoder {
key_partitioner: KeyPartitioner::new(None),
transformer: Default::default(),
encoder: Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
labels,
remove_label_fields: false,
remove_timestamp: false,
};

let msg: BTreeMap<String, Value> = serde_json::from_str("{}")?;
let event = Event::Log(LogEvent::from(msg));
let record = encoder.encode_event(event).unwrap();

assert_eq!(record.labels.len(), 1);
let labels: HashMap<String, String> = record.labels.into_iter().collect();
assert_eq!(labels["agent"], "vector".to_string());
Ok(())
}

#[test]
fn encoder_no_ts() {
let mut encoder = EventEncoder {
Expand Down

0 comments on commit f06692b

Please sign in to comment.