diff --git a/src/sinks/datadog/logs/log_api.rs b/src/sinks/datadog/logs/log_api.rs index 52bad2e37e42b..843f0ea21ea8b 100644 --- a/src/sinks/datadog/logs/log_api.rs +++ b/src/sinks/datadog/logs/log_api.rs @@ -16,6 +16,7 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::hash::BuildHasherDefault; use std::io::Write; +use tokio::sync::mpsc::Receiver; use tokio::time::{self, Duration}; use tower::Service; use twox_hash::XxHash64; @@ -24,6 +25,7 @@ use vector_core::event::{Event, EventFinalizers, Value}; use vector_core::sink::StreamSink; use vector_core::ByteSizeOf; +mod batcher; mod builder; mod common; mod errors; @@ -279,6 +281,51 @@ where } } +// /// Run the IO loop of this sink +// /// +// /// This sink is busy doing two things: encoding `Event` instances into Datadog +// /// logs payloads and then shunting these off to Datadog. Mixing the two causes +// /// a good deal of lifetime hassle in this sink, not to mention we don't light +// /// up CPUs like we might otherwise. +// fn run_io(requests: Receiver<(Vec, Request)>, http_client: Client) +// where +// Client: Service> + Send + Unpin, +// Client::Future: Send, +// Client::Response: Send, +// Client::Error: Send, +// { +// let mut flushes = FuturesUnordered::new(); +// tokio::select! { +// Some(()) = flushes.next() => { +// // nothing, intentionally +// } +// (finalizers, request) = requests.recv() => { +// let fut = http_client.call(request).map(move |result| { +// let status: EventStatus = match result { +// Ok(_) => { +// metrics::counter!("flush_success", 1); +// EventStatus::Delivered +// } +// Err(_) => { +// metrics::counter!("flush_error", 1); +// EventStatus::Errored +// } +// }; +// for finalizer in finalizers { +// finalizer.update_status(status); +// } +// metrics::counter!("processed_bytes_total", flush_metrics.processed_bytes_total); +// metrics::counter!( +// "processed_events_total", +// flush_metrics.processed_events_total +// ); +// () +// }); +// flushes.push(fut); +// } +// } +// } + #[async_trait] impl StreamSink for LogApi where @@ -298,14 +345,16 @@ where .await .map_err(|_e| ())?; - let mut input = input.map(|mut event| { - let log = event.as_mut_log(); - log.rename_key_flat(message_key, "message"); - log.rename_key_flat(timestamp_key, "date"); - log.rename_key_flat(host_key, "host"); - encoding.apply_rules(&mut event); - event - }); + let mut input = input + .map(|mut event| { + let log = event.as_mut_log(); + log.rename_key_flat(message_key, "message"); + log.rename_key_flat(timestamp_key, "date"); + log.rename_key_flat(host_key, "host"); + encoding.apply_rules(&mut event); + event + }) + .fuse(); let mut interval = time::interval(self.timeout); let mut flushes = FuturesUnordered::new(); diff --git a/src/sinks/datadog/logs/log_api/batcher.rs b/src/sinks/datadog/logs/log_api/batcher.rs new file mode 100644 index 0000000000000..2975239df6332 --- /dev/null +++ b/src/sinks/datadog/logs/log_api/batcher.rs @@ -0,0 +1,123 @@ +use futures::stream::Stream; +use futures::StreamExt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use vector_core::event::Event; + +pub struct Batcher<'a, St: ?Sized> { + stream: &'a mut St, +} + +impl Unpin for Batcher<'_, St> where St: ?Sized + Unpin {} + +impl<'a, St> Batcher<'a, St> +where + St: ?Sized + Stream + Unpin, +{ + pub fn new(stream: &'a mut St) -> Self { + Self { stream } + } +} + +impl Future for Batcher<'_, St> +where + St: ?Sized + Stream + Unpin, + St::Item: Event, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.stream.poll_next_unpin(cx) + } +} + +// use crate::sinks::datadog::logs::log_api::common; +// use futures::Stream; +// use std::collections::HashMap; +// use std::hash::BuildHasherDefault; +// use std::pin::Pin; +// use std::task::{Context, Poll}; +// use twox_hash::XxHash64; +// use vector_core::event::Event; + +// const MAX_PAYLOAD_ARRAY: usize = 1_000; + +// /// Batch incoming `Event` instances up for payload serialization +// /// +// /// Datadog Log api payloads have a few constraints. Each message must have no +// /// more than 1_000 members and payloads must not exceed 5Mb in size before +// /// compression. Every member in the payload must also ship with the same API +// /// key, meaning batches are constructed per-key. The API makes no restriction +// /// on how often we can call it, nor is there a minimum payload size. +// /// +// /// This structure confines itself to concerns about element totals and timing +// /// out if the stream of `Event`s for a particular key are slow. +// struct Batcher<'a> { +// /// The default Datadog API key to use +// /// +// /// In some instances an `Event` will come in on the stream with an +// /// associated API key. That API key is the one it'll get batched up by but +// /// otherwise we will see `Event` instances with no associated key. In that +// /// case we batch them by this default. +// /// +// /// Note that this is a `u64` and not a `Box` or similar. This sink +// /// stores all API keys in a slab and only materializes the actual API key +// /// when needed. +// default_api_key: u64, +// /// The slab of API keys +// /// +// /// This slab holds the actual materialized API key in the form of a +// /// `Box`. This avoids having lots of little strings running around +// /// with the downside of being an unbounded structure, in the present +// /// implementation. +// key_slab: HashMap, BuildHasherDefault>, +// /// The batches of `Event` instances, sorted by API key +// event_batches: HashMap, Vec, BuildHasherDefault>, +// /// The interior stream to wrap +// inner: Stream + 'a, +// } + +// impl<'a> Batcher<'a> { +// fn batch(default_api_key: Box, input: impl Stream + 'a) -> Self { +// let mut key_slab = HashMap::default(); +// let default_key_id = common::hash(&default_api_key); +// key_slab.insert(default_key_id, default_api_key); + +// Self { +// default_api_key: default_key_id, +// key_slab, +// event_batches: HashMap::default(), +// inner: Box::new(input), +// } +// } + +// /// Calculates and store the API key ID of an `Event` +// /// +// /// This function calculates the API key ID of a given `Event`. As a +// /// side-effect it mutates internal state of the struct allowing callers to +// /// use the ID to retrieve a `Box` of the key at a later time. +// fn register_key_id(&mut self, event: &Event) -> u64 { +// if let Some(api_key) = event.metadata().datadog_api_key() { +// let key = api_key.as_ref(); +// let key_hash = common::hash(key); +// // TODO it'd be nice to avoid passing through String +// self.key_slab +// .entry(key_hash) +// .or_insert_with(|| String::from(key).into_boxed_str()); +// key_hash +// } else { +// self.default_api_key +// } +// } +// } + +// impl<'a> Stream for Batcher<'a> { +// type Item = (Box, Vec); + +// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +// unimplemented!() +// } + +// // fn size_hint(&self) -> (usize, Option) { ... } +// }