From 5c1707f5972ff37d6bcd5782a157afac89efaa3d Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 25 Oct 2023 10:28:24 +0100 Subject: [PATCH] chore(prometheus_remote_write sink): remote write sink rewrite (#18676) * Refactor prometheus remote write sink Signed-off-by: Stephen Wakely * WIP Signed-off-by: Stephen Wakely * Made tests pass Signed-off-by: Stephen Wakely * Use shared compression Signed-off-by: Stephen Wakely * Clippy Signed-off-by: Stephen Wakely * Some comments Signed-off-by: Stephen Wakely * Use HttpResponse Signed-off-by: Stephen Wakely * Update request builder default Signed-off-by: Stephen Wakely * Update PartitionBatcher to use BatchConfig Signed-off-by: Stephen Wakely * Update PartitionBatcher to use BatchConfig Signed-off-by: Stephen Wakely * Remove zorkwonk Signed-off-by: Stephen Wakely * Make into fns as fns instead Signed-off-by: Stephen Wakely * Spelling Signed-off-by: Stephen Wakely * Don't box the closure Signed-off-by: Stephen Wakely * Clippy Signed-off-by: Stephen Wakely * Only insert timeout if we add the batch to the list Signed-off-by: Stephen Wakely * Allow the timer to remove an item Signed-off-by: Stephen Wakely * Allow partitions to be types other than Vec Signed-off-by: Stephen Wakely * Clippy Signed-off-by: Stephen Wakely * Added test for aggregation Signed-off-by: Stephen Wakely * Allow a custom object to be used for the reducer Signed-off-by: Stephen Wakely * Make aggregating optional Signed-off-by: Stephen Wakely * Adde test for non aggregation Signed-off-by: Stephen Wakely * Clippy Signed-off-by: Stephen Wakely * Component docs Signed-off-by: Stephen Wakely * Feedback from Kyle and Doug Signed-off-by: Stephen Wakely * Use generic compression options Signed-off-by: Stephen Wakely * Default compression to Snappy Signed-off-by: Stephen Wakely * Update docs Signed-off-by: Stephen Wakely * Add snappy to the compression docs Signed-off-by: Stephen Wakely * Remove proptest file Signed-off-by: Stephen Wakely * Snappy is no longer optional Signed-off-by: Stephen Wakely --------- Signed-off-by: Stephen Wakely --- Cargo.toml | 8 +- lib/vector-stream/src/partitioned_batcher.rs | 55 +- src/sinks/prometheus/remote_write.rs | 790 ------------------ src/sinks/prometheus/remote_write/config.rs | 229 +++++ .../remote_write/integration_tests.rs | 109 +++ src/sinks/prometheus/remote_write/mod.rs | 48 ++ .../remote_write/request_builder.rs | 131 +++ src/sinks/prometheus/remote_write/service.rs | 139 +++ src/sinks/prometheus/remote_write/sink.rs | 233 ++++++ src/sinks/prometheus/remote_write/tests.rs | 290 +++++++ src/sinks/util/buffer/compression.rs | 34 +- src/sinks/util/buffer/metrics/normalize.rs | 12 +- src/sinks/util/buffer/mod.rs | 9 + src/sinks/util/builder.rs | 4 +- src/sinks/util/compressor.rs | 11 +- src/sinks/util/http.rs | 4 +- src/sinks/util/mod.rs | 1 + src/sinks/util/snappy.rs | 88 ++ src/sinks/vector/sink.rs | 8 +- src/sources/prometheus/remote_write.rs | 214 +++-- .../components/sinks/base/appsignal.cue | 5 + .../sinks/base/aws_cloudwatch_logs.cue | 5 + .../sinks/base/aws_cloudwatch_metrics.cue | 5 + .../sinks/base/aws_kinesis_firehose.cue | 5 + .../sinks/base/aws_kinesis_streams.cue | 5 + .../components/sinks/base/aws_s3.cue | 5 + .../reference/components/sinks/base/axiom.cue | 5 + .../components/sinks/base/azure_blob.cue | 5 + .../components/sinks/base/clickhouse.cue | 5 + .../components/sinks/base/datadog_logs.cue | 5 + .../components/sinks/base/datadog_traces.cue | 5 + .../components/sinks/base/elasticsearch.cue | 5 + .../sinks/base/gcp_cloud_storage.cue | 5 + .../reference/components/sinks/base/http.cue | 5 + .../components/sinks/base/humio_logs.cue | 5 + .../components/sinks/base/humio_metrics.cue | 5 + .../components/sinks/base/new_relic.cue | 5 + .../sinks/base/prometheus_remote_write.cue | 39 +- .../components/sinks/base/splunk_hec_logs.cue | 5 + .../sinks/base/splunk_hec_metrics.cue | 5 + .../components/sinks/base/webhdfs.cue | 5 + 41 files changed, 1597 insertions(+), 959 deletions(-) delete mode 100644 src/sinks/prometheus/remote_write.rs create mode 100644 src/sinks/prometheus/remote_write/config.rs create mode 100644 src/sinks/prometheus/remote_write/integration_tests.rs create mode 100644 src/sinks/prometheus/remote_write/mod.rs create mode 100644 src/sinks/prometheus/remote_write/request_builder.rs create mode 100644 src/sinks/prometheus/remote_write/service.rs create mode 100644 src/sinks/prometheus/remote_write/sink.rs create mode 100644 src/sinks/prometheus/remote_write/tests.rs create mode 100644 src/sinks/util/snappy.rs diff --git a/Cargo.toml b/Cargo.toml index cf794391c783c..3a5b607be302d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -316,7 +316,7 @@ seahash = { version = "4.1.0", default-features = false } semver = { version = "1.0.20", default-features = false, features = ["serde", "std"], optional = true } smallvec = { version = "1", default-features = false, features = ["union", "serde"] } snafu = { version = "0.7.5", default-features = false, features = ["futures"] } -snap = { version = "1.1.0", default-features = false, optional = true } +snap = { version = "1.1.0", default-features = false } socket2 = { version = "0.5.5", default-features = false } stream-cancel = { version = "0.8.1", default-features = false } strip-ansi-escapes = { version = "0.2.0", default-features = false } @@ -560,9 +560,9 @@ sources-splunk_hec = ["dep:roaring"] sources-statsd = ["sources-utils-net", "tokio-util/net"] sources-stdin = ["tokio-util/io"] sources-syslog = ["codecs-syslog", "sources-utils-net", "tokio-util/net"] -sources-utils-http = ["dep:snap", "sources-utils-http-auth", "sources-utils-http-encoding", "sources-utils-http-error", "sources-utils-http-prelude"] +sources-utils-http = ["sources-utils-http-auth", "sources-utils-http-encoding", "sources-utils-http-error", "sources-utils-http-prelude"] sources-utils-http-auth = ["sources-utils-http-error"] -sources-utils-http-encoding = ["dep:snap", "sources-utils-http-error"] +sources-utils-http-encoding = ["sources-utils-http-error"] sources-utils-http-error = [] sources-utils-http-prelude = ["sources-utils-http", "sources-utils-http-auth", "sources-utils-http-encoding", "sources-utils-http-error"] sources-utils-http-query = [] @@ -715,7 +715,7 @@ sinks-nats = ["dep:async-nats", "dep:nkeys"] sinks-new_relic_logs = ["sinks-http"] sinks-new_relic = [] sinks-papertrail = ["dep:syslog"] -sinks-prometheus = ["dep:base64", "dep:prometheus-parser", "dep:snap"] +sinks-prometheus = ["dep:base64", "dep:prometheus-parser"] sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"] sinks-redis = ["dep:redis"] sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"] diff --git a/lib/vector-stream/src/partitioned_batcher.rs b/lib/vector-stream/src/partitioned_batcher.rs index 0d73cc5267098..b865216c1f764 100644 --- a/lib/vector-stream/src/partitioned_batcher.rs +++ b/lib/vector-stream/src/partitioned_batcher.rs @@ -15,7 +15,7 @@ use vector_core::{partition::Partitioner, time::KeyedTimer, ByteSizeOf}; use crate::batcher::{ config::BatchConfigParts, - data::BatchReduce, + data::BatchData, limiter::{ByteSizeOfItemSize, ItemBatchSize, SizeLimit}, BatchConfig, }; @@ -155,16 +155,15 @@ impl BatcherSettings { } /// A batcher config using the `ItemBatchSize` trait to determine batch sizes. - /// The output is built with the supplied reducer function. - pub fn into_reducer_config( - self, + /// The output is built with the supplied object implementing [`BatchData`]. + pub fn as_reducer_config( + &self, item_size: I, - reducer: F, - ) -> BatchConfigParts, BatchReduce> + reducer: B, + ) -> BatchConfigParts, B> where I: ItemBatchSize, - F: FnMut(&mut S, T), - S: Default, + B: BatchData, { BatchConfigParts { batch_limiter: SizeLimit { @@ -173,14 +172,14 @@ impl BatcherSettings { current_size: 0, item_size_calculator: item_size, }, - batch_data: BatchReduce::new(reducer), + batch_data: reducer, timeout: self.timeout, } } } #[pin_project] -pub struct PartitionedBatcher +pub struct PartitionedBatcher where Prt: Partitioner, { @@ -193,7 +192,7 @@ where /// The store of 'closed' batches. When this is not empty it will be /// preferentially flushed prior to consuming any new items from the /// underlying stream. - closed_batches: Vec<(Prt::Key, Vec)>, + closed_batches: Vec<(Prt::Key, B)>, /// The queue of pending batch expirations timer: KT, /// The partitioner for this `Batcher` @@ -203,7 +202,7 @@ where stream: Fuse, } -impl PartitionedBatcher, C, F> +impl PartitionedBatcher, C, F, B> where St: Stream, Prt: Partitioner + Unpin, @@ -226,7 +225,7 @@ where } #[cfg(test)] -impl PartitionedBatcher +impl PartitionedBatcher where St: Stream, Prt: Partitioner + Unpin, @@ -247,17 +246,17 @@ where } } -impl Stream for PartitionedBatcher +impl Stream for PartitionedBatcher where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, Prt::Item: ByteSizeOf, KT: KeyedTimer, - C: BatchConfig>, + C: BatchConfig, F: Fn() -> C + Send, { - type Item = (Prt::Key, Vec); + type Item = (Prt::Key, B); fn size_hint(&self) -> (usize, Option) { self.stream.size_hint() @@ -270,20 +269,18 @@ where return Poll::Ready(this.closed_batches.pop()); } match this.stream.as_mut().poll_next(cx) { - Poll::Pending => { - match this.timer.poll_expired(cx) { - // Unlike normal streams, `DelayQueue` can return `None` - // here but still be usable later if more entries are added. - Poll::Pending | Poll::Ready(None) => return Poll::Pending, - Poll::Ready(Some(item_key)) => { - let mut batch = this - .batches - .remove(&item_key) - .expect("batch should exist if it is set to expire"); - this.closed_batches.push((item_key, batch.take_batch())); - } + Poll::Pending => match this.timer.poll_expired(cx) { + // Unlike normal streams, `DelayQueue` can return `None` + // here but still be usable later if more entries are added. + Poll::Pending | Poll::Ready(None) => return Poll::Pending, + Poll::Ready(Some(item_key)) => { + let mut batch = this + .batches + .remove(&item_key) + .expect("batch should exist if it is set to expire"); + this.closed_batches.push((item_key, batch.take_batch())); } - } + }, Poll::Ready(None) => { // Now that the underlying stream is closed, we need to // clear out our batches, including all expiration diff --git a/src/sinks/prometheus/remote_write.rs b/src/sinks/prometheus/remote_write.rs deleted file mode 100644 index 043254630b63e..0000000000000 --- a/src/sinks/prometheus/remote_write.rs +++ /dev/null @@ -1,790 +0,0 @@ -use std::io::Read; -use std::sync::Arc; -use std::task; - -#[cfg(feature = "aws-core")] -use aws_credential_types::provider::SharedCredentialsProvider; -#[cfg(feature = "aws-core")] -use aws_types::region::Region; -use bytes::{Bytes, BytesMut}; -use futures::{future::BoxFuture, stream, FutureExt, SinkExt}; -use http::{Request, Uri}; -use prost::Message; -use snafu::{ResultExt, Snafu}; -use tower::Service; -use vector_config::configurable_component; -use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; - -use super::collector::{self, MetricCollector as _}; -use crate::{ - config::{self, AcknowledgementsConfig, Input, SinkConfig}, - event::{Event, Metric}, - http::HttpClient, - internal_events::{EndpointBytesSent, TemplateRenderingError}, - sinks::{ - self, - prometheus::PrometheusRemoteWriteAuth, - util::{ - auth::Auth, - batch::BatchConfig, - buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer}, - http::HttpRetryLogic, - uri, EncodedEvent, PartitionBuffer, PartitionInnerBuffer, SinkBatchSettings, - TowerRequestConfig, - }, - }, - template::Template, - tls::{TlsConfig, TlsSettings}, -}; - -#[derive(Clone, Copy, Debug, Default)] -pub struct PrometheusRemoteWriteDefaultBatchSettings; - -impl SinkBatchSettings for PrometheusRemoteWriteDefaultBatchSettings { - const MAX_EVENTS: Option = Some(1_000); - const MAX_BYTES: Option = None; - const TIMEOUT_SECS: f64 = 1.0; -} - -#[derive(Debug, Snafu)] -enum Errors { - #[snafu(display(r#"Prometheus remote_write sink cannot accept "set" metrics"#))] - SetMetricInvalid, - #[cfg(feature = "aws-core")] - #[snafu(display("aws.region required when AWS authentication is in use"))] - AwsRegionRequired, -} - -/// Configuration for the `prometheus_remote_write` sink. -#[configurable_component(sink( - "prometheus_remote_write", - "Deliver metric data to a Prometheus remote write endpoint." -))] -#[derive(Clone, Debug, Default)] -#[serde(deny_unknown_fields)] -pub struct RemoteWriteConfig { - /// The endpoint to send data to. - /// - /// The endpoint should include the scheme and the path to write to. - #[configurable(metadata(docs::examples = "https://localhost:8087/api/v1/write"))] - pub endpoint: String, - - /// The default namespace for any metrics sent. - /// - /// This namespace is only used if a metric has no existing namespace. When a namespace is - /// present, it is used as a prefix to the metric name, and separated with an underscore (`_`). - /// - /// It should follow the Prometheus [naming conventions][prom_naming_docs]. - /// - /// [prom_naming_docs]: https://prometheus.io/docs/practices/naming/#metric-names - #[configurable(metadata(docs::examples = "service"))] - #[configurable(metadata(docs::advanced))] - pub default_namespace: Option, - - /// Default buckets to use for aggregating [distribution][dist_metric_docs] metrics into histograms. - /// - /// [dist_metric_docs]: https://vector.dev/docs/about/under-the-hood/architecture/data-model/metric/#distribution - #[serde(default = "super::default_histogram_buckets")] - #[configurable(metadata(docs::advanced))] - pub buckets: Vec, - - /// Quantiles to use for aggregating [distribution][dist_metric_docs] metrics into a summary. - /// - /// [dist_metric_docs]: https://vector.dev/docs/about/under-the-hood/architecture/data-model/metric/#distribution - #[serde(default = "super::default_summary_quantiles")] - #[configurable(metadata(docs::advanced))] - pub quantiles: Vec, - - #[configurable(derived)] - #[serde(default)] - pub batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - pub request: TowerRequestConfig, - - /// The tenant ID to send. - /// - /// If set, a header named `X-Scope-OrgID` is added to outgoing requests with the value of this setting. - /// - /// This may be used by Cortex or other remote services to identify the tenant making the request. - #[serde(default)] - #[configurable(metadata(docs::examples = "my-domain"))] - #[configurable(metadata(docs::advanced))] - pub tenant_id: Option