Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Rework the Datadog logs sink #8825

Merged
merged 44 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
dd8d0cf
Initial work on a different take on the DD Log API
blt Aug 20, 2021
615a432
WIP
blt Aug 20, 2021
0139ae9
Minimal viability
blt Aug 21, 2021
7f61aa7
tidy up dead code
blt Aug 22, 2021
dc89755
checkpoint
blt Aug 23, 2021
9012d81
checkpoint
blt Aug 24, 2021
350880b
e2e acking
blt Aug 24, 2021
9738667
telemetry
blt Aug 24, 2021
d23ca17
checkpoint
blt Aug 25, 2021
79c5dc6
re-introduce has_space
blt Aug 25, 2021
8c51560
checkpoint
blt Aug 25, 2021
6b2929a
checkpoint
blt Aug 25, 2021
69e2cb6
Initial work to include layers on client
blt Aug 25, 2021
be7926c
Update with master
blt Aug 26, 2021
a8f37c1
checkpoint batcher stream processor
blt Aug 27, 2021
6feb015
checkpoint
blt Aug 31, 2021
4929050
avoid making client cloneable
blt Sep 1, 2021
bb35df3
one test corrected
blt Sep 1, 2021
6cb8004
the tests, they pass
blt Sep 1, 2021
070dadc
retry and layering
blt Sep 2, 2021
3e4678f
clippy dings
blt Sep 2, 2021
562f6b9
get retry introduced
blt Sep 2, 2021
945121f
get updated with master
blt Sep 3, 2021
bf73277
healthcheck re-introduced
blt Sep 4, 2021
86ab2d6
Merge branch 'master' into partition_buffer
blt Sep 9, 2021
a310307
first compiling rework
blt Sep 10, 2021
2b02731
un-dirty
blt Sep 10, 2021
37dd968
clippy dings
blt Sep 10, 2021
53e170e
tidy
blt Sep 10, 2021
88f790f
light up the CPUs more
blt Sep 11, 2021
731d9ab
undo const failure
blt Sep 11, 2021
8cb895a
fix tests
blt Sep 13, 2021
0029dec
Detect a 'payload too big' situation
blt Sep 13, 2021
b0056ed
lower batch goal
blt Sep 13, 2021
6cd60ba
language update
blt Sep 14, 2021
f37fbcb
API key must now be provided
blt Sep 14, 2021
3561670
less optional key
blt Sep 14, 2021
6368d90
no sleep, use future interface
blt Sep 14, 2021
0d55836
const default timeout
blt Sep 14, 2021
7c8dda0
remove max_bytes from cue docs
blt Sep 14, 2021
7961f79
Merge branch 'master' into partition_buffer
blt Sep 14, 2021
f8fa5d5
Add details on `max_bytes` removal
blt Sep 14, 2021
ed3ec1a
merge master
blt Sep 14, 2021
14d15ef
Merge branch 'master' into partition_buffer
blt Sep 15, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ encoding_rs = { version = "0.8.28", features = ["serde"] }
evmap = { version = "10.0.2", default-features = false, optional = true }
exitcode = { version = "1.1.2", default-features = false }
flate2 = { version = "1.0.21", default-features = false }
futures-util = { version = "0.3.17", default-features = false }
getset = { version = "0.1.1", default-features = false }
glob = { version = "0.3.0", default-features = false }
grok = { version = "1.2.0", default-features = false, optional = true }
Expand Down
12 changes: 8 additions & 4 deletions lib/vector-core/src/event/finalization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ pub enum BatchStatus {
}

impl BatchStatus {
/// Update this status with another batch's delivery status, and return the result.
/// Update this status with another batch's delivery status, and return the
/// result.
bruceg marked this conversation as resolved.
Show resolved Hide resolved
#[allow(clippy::match_same_arms)] // False positive: https://github.com/rust-lang/rust-clippy/issues/860
fn update(self, status: EventStatus) -> Self {
match (self, status) {
Expand All @@ -281,21 +282,24 @@ impl BatchStatus {
#[derivative(Default)]
#[repr(u8)]
pub enum EventStatus {
/// All copies of this event were dropped without being finalized (the default).
/// All copies of this event were dropped without being finalized (the
/// default).
#[derivative(Default)]
Dropped,
/// All copies of this event were delivered successfully.
Delivered,
/// At least one copy of this event encountered a retriable error.
Errored,
/// At least one copy of this event encountered a permanent failure or rejection.
/// At least one copy of this event encountered a permanent failure or
/// rejection.
Failed,
/// This status has been recorded and should not be updated.
Recorded,
}

impl EventStatus {
/// Update this status with another event's finalization status and return the result.
/// Update this status with another event's finalization status and return
/// the result.
///
/// # Panics
///
Expand Down
2 changes: 1 addition & 1 deletion src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl<B> Service<Request<B>> for HttpClient<B>
where
B: fmt::Debug + HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<crate::Error>,
B::Error: Into<crate::Error> + Send,
{
type Response = http::Response<Body>;
type Error = HttpError;
Expand Down
120 changes: 49 additions & 71 deletions src/sinks/datadog/logs/config.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
use super::service::LogApiRetry;
use crate::config::{DataType, GenerateConfig, SinkConfig, SinkContext};
use crate::http::HttpClient;
use crate::sinks::datadog::logs::healthcheck::healthcheck;
use crate::sinks::datadog::logs::service;
use crate::sinks::datadog::ApiKey;
use crate::sinks::datadog::logs::service::LogApiService;
use crate::sinks::datadog::logs::sink::LogSink;
use crate::sinks::datadog::Region;
use crate::sinks::util::encoding::EncodingConfigWithDefault;
use crate::sinks::util::{
batch::{Batch, BatchError},
http::{HttpSink, PartitionHttpSink},
BatchConfig, BatchSettings, Compression, JsonArrayBuffer, PartitionBuffer,
PartitionInnerBuffer, TowerRequestConfig,
};
use crate::sinks::util::service::ServiceBuilderExt;
use crate::sinks::util::Concurrency;
use crate::sinks::util::{BatchConfig, Compression, TowerRequestConfig};
use crate::sinks::{Healthcheck, VectorSink};
use crate::tls::{MaybeTlsSettings, TlsConfig};
use futures::{FutureExt, SinkExt};
use futures::FutureExt;
use indoc::indoc;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use std::time::Duration;
use tower::ServiceBuilder;
use vector_core::config::proxy::ProxyConfig;

const DEFAULT_REQUEST_LIMITS: TowerRequestConfig = {
TowerRequestConfig::const_new(Concurrency::Fixed(50), Concurrency::Fixed(50))
.rate_limit_num(250)
};

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -81,82 +87,54 @@ impl DatadogLogsConfig {
});
http::Uri::try_from(endpoint).expect("URI not valid")
}
}

fn batch_settings<T: Batch>(&self) -> Result<BatchSettings<T>, BatchError> {
BatchSettings::default()
.bytes(bytesize::mib(5_u32))
.events(1_000)
.timeout(15)
.parse_config(self.batch)
}

/// Builds the required BatchedHttpSink.
/// Since the DataDog sink can create one of two different sinks, this
/// extracts most of the shared functionality required to create either sink.
fn build_sink<T, B, O>(
impl DatadogLogsConfig {
pub fn build_processor(
&self,
client: HttpClient,
cx: SinkContext,
service: T,
batch: B,
timeout: Duration,
) -> crate::Result<(VectorSink, Healthcheck)>
where
O: 'static,
B: Batch<Output = Vec<O>> + std::marker::Send + 'static,
B::Output: std::marker::Send + Clone,
B::Input: std::marker::Send,
T: HttpSink<
Input = PartitionInnerBuffer<B::Input, ApiKey>,
Output = PartitionInnerBuffer<B::Output, ApiKey>,
> + Clone,
{
let request_settings = self.request.unwrap_with(&TowerRequestConfig::default());
) -> crate::Result<VectorSink> {
let default_api_key: Arc<str> = Arc::from(self.default_api_key.clone().as_str());
let request_limits = self.request.unwrap_with(&DEFAULT_REQUEST_LIMITS);
let batch_timeout = self.batch.timeout_secs.map(Duration::from_secs);

let service = ServiceBuilder::new()
.settings(request_limits, LogApiRetry)
.service(LogApiService::new(client, self.get_uri()));
let sink = LogSink::new(service, cx)
.batch_timeout(batch_timeout)
.encoding(self.encoding.clone())
.default_api_key(default_api_key)
.compression(self.compression.unwrap_or_default())
.log_schema(vector_core::config::log_schema())
.build();

Ok(VectorSink::Stream(Box::new(sink)))
}

pub fn build_healthcheck(&self, client: HttpClient) -> crate::Result<Healthcheck> {
let healthcheck = healthcheck(client, self.get_uri(), self.default_api_key.clone()).boxed();
Ok(healthcheck)
}

pub fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
let tls_settings = MaybeTlsSettings::from_config(
&Some(self.tls.clone().unwrap_or_else(TlsConfig::enabled)),
false,
)?;

let client = HttpClient::new(tls_settings, cx.proxy())?;
let healthcheck = healthcheck(
service.clone(),
client.clone(),
self.default_api_key.clone(),
)
.boxed();
let sink = PartitionHttpSink::new(
service,
PartitionBuffer::new(batch),
request_settings,
timeout,
client,
cx.acker(),
)
.sink_map_err(|error| error!(message = "Fatal datadog_logs text sink error.", %error));
let sink = VectorSink::Sink(Box::new(sink));

Ok((sink, healthcheck))
Ok(HttpClient::new(tls_settings, proxy)?)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Ok(X?) form here seems redundant. Can this be done without the ??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The types don't line up properly. the crate::Result requires a boxed dynamic instance of StdError + Send + Sync + 'static and the client returns an http::HttpError.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, then it becomes HttpClient::new(tls_settings, proxy).map_err(Into::into). Tomayto, Tomahto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'd really like to see -- and I think it's possible once we have our first pass over the sinks done -- is to make the error generic in the topology so we can drop the boxing.

}
}

#[async_trait::async_trait]
#[typetag::serde(name = "datadog_logs")]
impl SinkConfig for DatadogLogsConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let batch_settings = self.batch_settings()?;
let service = service::Service::builder()
.encoding(self.encoding.clone())
.compression(self.compression.unwrap_or_default())
.uri(self.get_uri())
.default_api_key(Arc::from(self.default_api_key.clone()))
.log_schema(vector_core::config::log_schema())
.build();
self.build_sink(
cx,
service,
JsonArrayBuffer::new(batch_settings.size),
batch_settings.timeout,
)
let client = self.create_client(&cx.proxy)?;
let healthcheck = self.build_healthcheck(client.clone())?;
let sink = self.build_processor(client, cx)?;
Ok((sink, healthcheck))
}

fn input_type(&self) -> DataType {
Expand Down
34 changes: 12 additions & 22 deletions src/sinks/datadog/logs/healthcheck.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,20 @@
use super::ApiKey;
use crate::{
http::HttpClient,
sinks::util::{http::HttpSink, PartitionInnerBuffer},
};
use http::StatusCode;
use crate::http::HttpClient;
use http::{Request, Response, StatusCode, Uri};
use hyper::body::Body;
use std::sync::Arc;

/// The healthcheck is performed by sending an empty request to Datadog and
/// checking the return.
pub async fn healthcheck<T, O>(sink: T, client: HttpClient, api_key: String) -> crate::Result<()>
where
T: HttpSink<Output = PartitionInnerBuffer<Vec<O>, ApiKey>>,
{
let req = sink
.build_request(PartitionInnerBuffer::new(
Vec::with_capacity(0),
Arc::from(api_key),
))
.await?
.map(Body::from);
pub async fn healthcheck(client: HttpClient, uri: Uri, api_key: String) -> crate::Result<()> {
let body = vec![];
let request: Request<Body> = Request::post(uri)
.header("Content-Type", "application/json")
.header("DD-API-KEY", &api_key[..])
.header("Content-Length", body.len())
.body(Body::from(body))?;
let response: Response<Body> = client.send(request).await?;

let res = client.send(req).await?;

let status = res.status();
let body = hyper::body::to_bytes(res.into_body()).await?;
let status = response.status();
let body = hyper::body::to_bytes(response.into_body()).await?;

match status {
StatusCode::OK => Ok(()),
Expand Down
18 changes: 17 additions & 1 deletion src/sinks/datadog/logs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
use super::ApiKey;
//! The Datadog Logs [`VectorSink`]
//!
//! This module contains the [`VectorSink`] instance that is responsible for
//! taking a stream of [`Event`] instances and getting them flung out to the
//! Datadog Log API. The log API is relatively generous in terms of its
//! constraints, excepting that:
blt marked this conversation as resolved.
Show resolved Hide resolved
//!
//! * a 'payload' is comprised of no more than 1,000 array members
//! * a 'payload' may not be more than 5Mb in size, uncompressed and
//! * a 'payload' may not mix API keys
//!
//! Otherwise per [the
//! docs](https://docs.datadoghq.com/api/latest/logs/#send-logs) there aren't
//! other major constraints we have to follow in this implementation. The sink
//! is careful to always send the maximum payload size excepting where we
//! violate the size constraint.
blt marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(test)]
mod tests;

mod config;
mod healthcheck;
mod service;
mod sink;

use crate::config::SinkDescription;
use crate::sinks::datadog::logs::config::DatadogLogsConfig;
Expand Down
Loading