Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(loki sink): Partition HTTP requests to loki by stream & re-enable concurrency #8615

Merged
merged 18 commits into from
Aug 21, 2021
51 changes: 39 additions & 12 deletions src/sinks/loki.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
//!
//! <https://github.com/grafana/loki/blob/master/docs/api.md>
//!
//! This sink does not use `PartitionBatching` but elects to do
//! stream multiplexing by organizing the streams in the `build_request`
//! phase. There must be at least one valid set of labels.
//! This sink uses `PartitionBatching` to partition events
//! by streams. There must be at least one valid set of labels.
//!
//! If an event produces no labels, this can happen if the template
//! does not match, we will add a default label `{agent="vector"}`.
Expand All @@ -20,9 +19,8 @@ use crate::{
buffer::loki::{GlobalTimestamps, LokiBuffer, LokiEvent, LokiRecord, PartitionKey},
encoding::{EncodingConfig, EncodingConfiguration},
http::{HttpSink, PartitionHttpSink},
service::ConcurrencyOption,
BatchConfig, BatchSettings, PartitionBuffer, PartitionInnerBuffer, TowerRequestConfig,
UriSerde,
BatchConfig, BatchSettings, Concurrency, PartitionBuffer, PartitionInnerBuffer,
TowerRequestConfig, UriSerde,
},
template::Template,
tls::{TlsOptions, TlsSettings},
Expand Down Expand Up @@ -106,11 +104,10 @@ impl SinkConfig for LokiConfig {
}
}

if self.request.concurrency.is_some() {
warn!("Option `request.concurrency` is not supported.");
}
let mut request_settings = self.request.unwrap_with(&TowerRequestConfig::default());
request_settings.concurrency = Some(1);
let request_settings = self.request.unwrap_with(&TowerRequestConfig {
concurrency: Concurrency::Fixed(5),
..Default::default()
});

let batch_settings = BatchSettings::default()
.bytes(102_400)
Expand Down Expand Up @@ -139,6 +136,7 @@ impl SinkConfig for LokiConfig {
client.clone(),
cx.acker(),
)
.ordered()
.sink_map_err(|error| error!(message = "Fatal loki sink error.", %error));

let healthcheck = healthcheck(config, client).boxed();
Expand Down Expand Up @@ -199,7 +197,6 @@ impl HttpSink for LokiSink {
})
.ok()
});
let key = PartitionKey { tenant_id };

let mut labels = Vec::new();

Expand Down Expand Up @@ -249,6 +246,8 @@ impl HttpSink for LokiSink {
labels = vec![("agent".to_string(), "vector".to_string())]
}

let key = PartitionKey::new(tenant_id, &mut labels);

let event = LokiEvent { timestamp, event };
Some(PartitionInnerBuffer::new(
LokiRecord {
Expand Down Expand Up @@ -873,6 +872,33 @@ mod integration_tests {
.await;
}

#[tokio::test]
async fn out_of_order_per_partition() {
let batch_size = 2;
let big_lines = random_lines(1_000_000).take(2);
let small_lines = random_lines(1).take(20);
let mut events = big_lines
.into_iter()
.chain(small_lines)
.map(Event::from)
.collect::<Vec<_>>();

let base = chrono::Utc::now() - Duration::seconds(30);
for (i, event) in events.iter_mut().enumerate() {
let log = event.as_mut_log();
log.insert(
log_schema().timestamp_key(),
base + Duration::seconds(i as i64),
);
}

// So, since all of the events are of the same partition, and if there is concurrency,
// then if ordering inside paritions isn't upheld, the big line events will take longer
// time to flush than small line events so loki will receive smaller ones before large
// ones hence out of order events.
test_out_of_order_events(OutOfOrderAction::Drop, batch_size, events.clone(), events).await;
Comment on lines +895 to +899
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this test is fundamentally racy, isn't it? Is there any way to provoke out of order events without racing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is.

Is there any way to provoke out of order events without racing

Not really. There are two more ways to provoke them:

  • Fail requests and force them to retry.
    • This isn't quite testing what it needs too since retry logic can correct this.
    • A mock loki server which we can configure to fail requests would be needed.
  • Stall the start of the server so that requests pile up.
    • This is yet again racy.

What we can do is to increase the number of events to increase the chance of out of order event happening.

}

async fn test_out_of_order_events(
action: OutOfOrderAction,
batch_size: usize,
Expand All @@ -897,6 +923,7 @@ mod integration_tests {
Template::try_from(stream.to_string()).unwrap(),
);
config.batch.max_events = Some(batch_size);
config.batch.max_bytes = Some(4_000_000);

let (sink, _) = config.build(cx).await.unwrap();
sink.into_sink()
Expand Down
167 changes: 62 additions & 105 deletions src/sinks/util/buffer/loki.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,35 +54,48 @@ impl From<&LokiEvent> for LokiEncodedEvent {
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub struct PartitionKey {
pub tenant_id: Option<String>,
labels: String,
}

impl PartitionKey {
pub fn new(tenant_id: Option<String>, labels: &mut Labels) -> Self {
// Let's join all of the labels to single string so that
// cloning requires only single allocation.
// That requires sorting to ensure uniqueness, but
// also choosing a separator that isn't likely to be
// used in either name or value.
labels.sort();
PartitionKey {
tenant_id,
labels: labels.iter().flat_map(|(a, b)| [a, "→", b, "∇"]).collect(),
}
}
}

#[derive(Debug, Default, Clone)]
pub struct GlobalTimestamps {
map: Arc<DashMap<PartitionKey, HashMap<Labels, i64>>>,
map: Arc<DashMap<PartitionKey, i64>>,
}

impl GlobalTimestamps {
pub fn take(&self, partition: &PartitionKey) -> HashMap<Labels, i64> {
self.map
.remove(partition)
.map(|(_k, v)| v)
.unwrap_or_default()
pub fn take(&self, partition: &PartitionKey) -> Option<i64> {
self.map.remove(partition).map(|(_k, v)| v)
}

pub fn insert(&self, partition: PartitionKey, map: HashMap<Labels, i64>) {
self.map.insert(partition, map);
pub fn insert(&self, partition: PartitionKey, timestamp: i64) {
self.map.insert(partition, timestamp);
}
}

#[derive(Debug)]
pub struct LokiBuffer {
num_bytes: usize,
num_items: usize,
streams: HashMap<Labels, Vec<LokiEncodedEvent>>,
stream: Vec<LokiEncodedEvent>,
settings: BatchSize<Self>,

partition: Option<PartitionKey>,
latest_timestamps: Option<HashMap<Labels, i64>>,
partition: Option<(PartitionKey, Labels)>,
latest_timestamp: Option<Option<i64>>,
global_timestamps: GlobalTimestamps,
out_of_order_action: OutOfOrderAction,
}
Expand All @@ -96,10 +109,10 @@ impl LokiBuffer {
Self {
num_bytes: WRAPPER_OVERHEAD,
num_items: 0,
streams: HashMap::new(),
stream: Vec::new(),
settings,
partition: None,
latest_timestamps: None,
latest_timestamp: None,
global_timestamps,
out_of_order_action,
}
Expand Down Expand Up @@ -129,16 +142,13 @@ impl Batch for LokiBuffer {
item.labels.sort_unstable();

let partition = &item.partition;
if self.latest_timestamps.is_none() {
self.partition = Some(item.partition.clone());
self.latest_timestamps = Some(self.global_timestamps.take(partition));
if self.latest_timestamp.is_none() {
self.partition = Some((item.partition.clone(), item.labels.clone()));
self.latest_timestamp = Some(self.global_timestamps.take(partition));
}
let latest_timestamp = self
.latest_timestamps
.as_ref()
.latest_timestamp
.unwrap()
.get(&item.labels)
.cloned()
.unwrap_or(item.event.timestamp);
if item.event.timestamp < latest_timestamp {
match self.out_of_order_action {
Expand Down Expand Up @@ -178,20 +188,19 @@ impl Batch for LokiBuffer {
{
PushResult::Overflow(item)
} else {
let new_bytes = match self.streams.get_mut(&item.labels) {
// Label exists, and we checked the size, just add it
Some(stream) => {
stream.push(event);
let new_bytes = match self.stream.is_empty() {
// Event was already added, and we checked the size, just add it
false => {
self.stream.push(event);
event_len + 1
}
None => {
true => {
// Have to verify label size doesn't cause overflow
let new_bytes =
labels_len + event_len + if self.streams.is_empty() { 0 } else { 1 };
let new_bytes = labels_len + event_len;
if self.num_bytes + new_bytes > self.settings.bytes {
return PushResult::Overflow(item);
} else {
self.streams.insert(item.labels, vec![event]);
self.stream.push(event);
new_bytes
}
}
Expand All @@ -203,7 +212,7 @@ impl Batch for LokiBuffer {
}

fn is_empty(&self) -> bool {
self.streams.is_empty()
self.stream.is_empty()
}

fn fresh(&self) -> Self {
Expand All @@ -215,51 +224,28 @@ impl Batch for LokiBuffer {
}

fn finish(self) -> Self::Output {
let mut latest_timestamps = self.latest_timestamps.expect("Batch is empty");
let streams_json = self
.streams
.into_iter()
.map(|(labels, mut events)| {
// Sort events by timestamp
events.sort_by_key(|e| e.timestamp);

latest_timestamps.insert(
labels.clone(),
events.last().expect("Batch is empty").timestamp,
);

let labels = labels.into_iter().collect::<HashMap<_, _>>();
let events = events.into_iter().map(|e| e.encoded).collect::<Vec<_>>();

(
to_raw_value(&labels).expect("JSON encoding should never fail"),
events,
)
})
.collect::<Vec<_>>();
self.global_timestamps
.insert(self.partition.expect("Bacth is empty"), latest_timestamps);

// This is just to guarantee stable key ordering for tests
#[cfg(test)]
let streams_json = {
let mut streams_json = streams_json;
streams_json.sort_unstable_by_key(|s| s.0.to_string());
streams_json
};

let streams_json = streams_json
.into_iter()
.map(|(stream, events)| {
json!({
"stream": stream,
"values": events,
})
})
.collect::<Vec<_>>();
let (partition, labels) = self.partition.expect("Batch is empty");

let mut events = self.stream;
// Sort events by timestamp
events.sort_by_key(|e| e.timestamp);

let latest_timestamp = events.last().expect("Batch is empty").timestamp;
let events = events.into_iter().map(|e| e.encoded).collect::<Vec<_>>();

let labels = labels
.iter()
.map(|&(ref key, ref value)| (key, value))
.collect::<HashMap<_, _>>();
let stream = to_raw_value(&labels).expect("JSON encoding should never fail");

self.global_timestamps.insert(partition, latest_timestamp);

json!({
"streams": streams_json,
"streams": vec![json!({
"stream": stream,
"values": events,
})]
})
}

Expand Down Expand Up @@ -295,7 +281,7 @@ mod tests {
);
assert!(matches!(
buffer.push(LokiRecord {
partition: PartitionKey { tenant_id: None },
partition: PartitionKey::new(None, &mut vec![("label1".into(), "value1".into())]),
labels: vec![("label1".into(), "value1".into())],
event: LokiEvent {
timestamp: 123456789,
Expand All @@ -306,42 +292,13 @@ mod tests {
));

assert_eq!(buffer.num_items, 1);
assert_eq!(buffer.streams.len(), 1);
assert!(!buffer.stream.is_empty());
test_finish(
buffer,
r#"{"streams":[{"stream":{"label1":"value1"},"values":[["123456789","this is an event"]]}]}"#,
);
}

#[test]
fn insert_multiple_streams() {
let mut buffer = LokiBuffer::new(
BatchSettings::default().size,
Default::default(),
Default::default(),
);
for n in 1..4 {
assert!(matches!(
buffer.push(LokiRecord {
partition: PartitionKey { tenant_id: None },
labels: vec![("asdf".into(), format!("value{}", n))],
event: LokiEvent {
timestamp: 123456780 + n,
event: format!("event #{}", n),
},
}),
PushResult::Ok(false)
));
}

assert_eq!(buffer.num_items, 3);
assert_eq!(buffer.streams.len(), 3);
test_finish(
buffer,
r#"{"streams":[{"stream":{"asdf":"value1"},"values":[["123456781","event #1"]]},{"stream":{"asdf":"value2"},"values":[["123456782","event #2"]]},{"stream":{"asdf":"value3"},"values":[["123456783","event #3"]]}]}"#,
);
}

#[test]
fn insert_multiple_one_stream() {
let mut buffer = LokiBuffer::new(
Expand All @@ -352,7 +309,7 @@ mod tests {
for n in 1..4 {
assert!(matches!(
buffer.push(LokiRecord {
partition: PartitionKey { tenant_id: None },
partition: PartitionKey::new(None, &mut vec![("asdf".into(), "value1".into())]),
labels: vec![("asdf".into(), "value1".into())],
event: LokiEvent {
timestamp: 123456780 + n,
Expand All @@ -364,7 +321,7 @@ mod tests {
}

assert_eq!(buffer.num_items, 3);
assert_eq!(buffer.streams.len(), 1);
assert!(!buffer.stream.is_empty());
test_finish(
buffer,
r#"{"streams":[{"stream":{"asdf":"value1"},"values":[["123456781","event #1"],["123456782","event #2"],["123456783","event #3"]]}]}"#,
Expand Down
6 changes: 6 additions & 0 deletions src/sinks/util/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ where
slot: None,
}
}

/// Enforces per partition ordering of request.
pub fn ordered(mut self) -> Self {
self.inner.ordered();
self
}
}

impl<T, B, K, RL> Sink<Event> for PartitionHttpSink<T, B, K, RL>
Expand Down
Loading