Skip to content

Commit

Permalink
Initial work to include layers on client
Browse files Browse the repository at this point in the history
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
  • Loading branch information
blt committed Aug 25, 2021
1 parent b34b3eb commit fef11fd
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 264 deletions.
44 changes: 17 additions & 27 deletions src/sinks/datadog/logs/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config::{DataType, GenerateConfig, SinkConfig, SinkContext};
use crate::http::HttpClient;
use crate::sinks::datadog::logs::sink::LogApi;
use crate::sinks::datadog::logs::log_api::LogApi;
use crate::sinks::datadog::Region;
use crate::sinks::util::encoding::EncodingConfigWithDefault;
use crate::sinks::util::{BatchConfig, Compression, TowerRequestConfig};
Expand All @@ -10,31 +10,7 @@ use futures::FutureExt;
use indoc::indoc;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;

// What is important here? I have to have a solution that satisfies the
// `BatchConfig` passed in. The final sink-type thing must fit into a
// `VectorSink`. The `BatchConfig` has two important features:
//
// * `max_bytes` -- the total number of bytes to batch up before processing
// * `timeout_secs` -- the total number of seconds to store events maximum before processing
//
// This sink does not obey `max_events`.
//
// Now, the datadog logs endpoint has some limitations to be aware of. First, a
// payload is defined by API key. That is, ultimately, if I have events
// associated with N API keys even if the sum of all the events is under the
// logs API payload limit I have N payloads to get out. Also, a payload may be
// no more than 5Mb and the interior array may have no more than 1000
// members. There is no limit on the number of requests that can be made in a
// given interval of time.
//
// Okay, so, for each API key buffer up to 1_000 events under that API _or_ in
// the event of a timeout (global) _or_ in the event that the size of the
// buffered events exceeds `max_bytes` (global) kick out a request to the
// datadog API. We will take the byte size of the `Event` -- since that's what
// impacts the user -- and NOT its serialized size. As such we'll have to take
// care to avoid serialization that goes over the prescibed size limit to the
// API.
use tower::ServiceBuilder;

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -115,8 +91,22 @@ impl SinkConfig for DatadogLogsConfig {
false,
)?;

let request_settings = self.request.unwrap_with(&TowerRequestConfig::default());
let client = HttpClient::new(tls_settings, cx.proxy())?;
// let client = ServiceBuilder::new().concurrency_limit(100).service(client);
let client = ServiceBuilder::new()
.rate_limit(
request_settings.rate_limit_num,
request_settings.rate_limit_duration,
)
// TODO need whatever `retry_logic` was previously
// .retry(request_settings.retry_policy
// .layer(AdaptiveConcurrencyLimitLayer::new(
// self.concurrency,
// self.adaptive_concurrency,
// retry_logic,
// ))
.timeout(request_settings.timeout)
.service(client);

// let healthcheck = healthcheck(
// service.clone(),
Expand Down
Loading

0 comments on commit fef11fd

Please sign in to comment.