From 380d7adb72a02e8da0af35fd3d80ecb1d8b0b541 Mon Sep 17 00:00:00 2001 From: Alexander Zaitsev Date: Thu, 8 Jun 2023 19:16:28 +0200 Subject: [PATCH] feat(prometheus): add more compression algorithms to Prometheus Remote Write (#17334) Resolves https://github.com/vectordotdev/vector/issues/17199 - add Zstd and Gzip support to Prometheus Remote Write - add a new compression option in the config (Snappy is the default) - update the documentation Tested: - Local build --- src/sinks/prometheus/remote_write.rs | 61 +++++++++++++++++-- .../sinks/base/prometheus_remote_write.cue | 12 ++++ .../sinks/prometheus_remote_write.cue | 9 +++ website/cue/reference/urls.cue | 1 + 4 files changed, 77 insertions(+), 6 deletions(-) diff --git a/src/sinks/prometheus/remote_write.rs b/src/sinks/prometheus/remote_write.rs index 2692253780c5a..027cd19e4d8cd 100644 --- a/src/sinks/prometheus/remote_write.rs +++ b/src/sinks/prometheus/remote_write.rs @@ -1,3 +1,4 @@ +use std::io::Read; use std::sync::Arc; use std::task; @@ -123,10 +124,40 @@ pub struct RemoteWriteConfig { skip_serializing_if = "crate::serde::skip_serializing_if_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[configurable(metadata(docs::advanced))] + #[serde(default)] + pub compression: Compression, } impl_generate_config_from_default!(RemoteWriteConfig); +/// Supported compression types for Prometheus Remote Write. +#[configurable_component] +#[derive(Clone, Copy, Debug, Derivative)] +#[derivative(Default)] +#[serde(rename_all = "lowercase")] +pub enum Compression { + /// Snappy. + #[derivative(Default)] + Snappy, + + /// Gzip. + Gzip, + + /// Zstandard. + Zstd, +} + +const fn convert_compression_to_content_encoding(compression: Compression) -> &'static str { + match compression { + Compression::Snappy => "snappy", + Compression::Gzip => "gzip", + Compression::Zstd => "zstd", + } +} + #[async_trait::async_trait] impl SinkConfig for RemoteWriteConfig { async fn build( @@ -181,6 +212,7 @@ impl SinkConfig for RemoteWriteConfig { aws_region, credentials_provider, http_auth, + compression: self.compression, }); let healthcheck = healthcheck(client.clone(), Arc::clone(&http_request_builder)).boxed(); @@ -190,6 +222,7 @@ impl SinkConfig for RemoteWriteConfig { buckets, quantiles, http_request_builder, + compression: self.compression, }; let sink = { @@ -277,6 +310,7 @@ struct RemoteWriteService { buckets: Vec, quantiles: Vec, http_request_builder: Arc, + compression: Compression, } impl RemoteWriteService { @@ -312,7 +346,7 @@ impl Service, PartitionKey>> for RemoteWriteSer fn call(&mut self, buffer: PartitionInnerBuffer, PartitionKey>) -> Self::Future { let (events, key) = buffer.into_parts(); let body = self.encode_events(events); - let body = snap_block(body); + let body = compress_block(self.compression, body); let client = self.client.clone(); let request_builder = Arc::clone(&self.http_request_builder); @@ -344,6 +378,7 @@ pub struct HttpRequestBuilder { pub aws_region: Option, pub http_auth: Option, pub credentials_provider: Option, + pub compression: Compression, } impl HttpRequestBuilder { @@ -353,11 +388,13 @@ impl HttpRequestBuilder { body: Vec, tenant_id: Option, ) -> Result, crate::Error> { + let content_encoding = convert_compression_to_content_encoding(self.compression); + let mut builder = http::Request::builder() .method(method) .uri(self.endpoint.clone()) .header("X-Prometheus-Remote-Write-Version", "0.1.0") - .header("Content-Encoding", "snappy") + .header("Content-Encoding", content_encoding) .header("Content-Type", "application/x-protobuf"); if let Some(tenant_id) = &tenant_id { @@ -380,10 +417,22 @@ impl HttpRequestBuilder { } } -fn snap_block(data: Bytes) -> Vec { - snap::raw::Encoder::new() - .compress_vec(&data) - .expect("Out of memory") +fn compress_block(compression: Compression, data: Bytes) -> Vec { + match compression { + Compression::Snappy => snap::raw::Encoder::new() + .compress_vec(&data) + .expect("snap compression failed, please report"), + Compression::Gzip => { + let mut buf = Vec::new(); + flate2::read::GzEncoder::new(data.as_ref(), flate2::Compression::default()) + .read_to_end(&mut buf) + .expect("gzip compression failed, please report"); + buf + } + Compression::Zstd => { + zstd::encode_all(data.as_ref(), 0).expect("zstd compression failed, please report") + } + } } async fn sign_request( diff --git a/website/cue/reference/components/sinks/base/prometheus_remote_write.cue b/website/cue/reference/components/sinks/base/prometheus_remote_write.cue index 96242b171ad24..4089af9b9af7a 100644 --- a/website/cue/reference/components/sinks/base/prometheus_remote_write.cue +++ b/website/cue/reference/components/sinks/base/prometheus_remote_write.cue @@ -223,6 +223,18 @@ base: components: sinks: prometheus_remote_write: configuration: { items: type: float: {} } } + compression: { + description: "Supported compression types for Prometheus Remote Write." + required: false + type: string: { + default: "snappy" + enum: { + gzip: "Gzip." + snappy: "Snappy." + zstd: "Zstandard." + } + } + } default_namespace: { description: """ The default namespace for any metrics sent. diff --git a/website/cue/reference/components/sinks/prometheus_remote_write.cue b/website/cue/reference/components/sinks/prometheus_remote_write.cue index c384571aef6f2..fc16151370419 100644 --- a/website/cue/reference/components/sinks/prometheus_remote_write.cue +++ b/website/cue/reference/components/sinks/prometheus_remote_write.cue @@ -100,5 +100,14 @@ components: sinks: prometheus_remote_write: { values for each name, Vector will only send the last value specified. """ } + compression_schemes: { + title: "Compression schemes" + body: """ + Officially according to the [Prometheus Remote-Write specification](\(urls.prometheus_remote_write_spec)), + the only supported compression scheme is [Snappy](\(urls.snappy)). However, + there are a number of other implementations that do support other schemes. Thus + Vector also supports using Gzip and Zstd. + """ + } } } diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index fd9cafc52d6dc..0e3560392c108 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -412,6 +412,7 @@ urls: { prometheus_remote_integrations: "https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage" prometheus_remote_write: "https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write" prometheus_remote_write_protocol: "https://docs.google.com/document/d/1LPhVRSFkGNSuU1fBd81ulhsCPR4hkSZyyBj1SZ8fWOM/edit#heading=h.n0d0vphea3fe" + prometheus_remote_write_spec: "https://prometheus.io/docs/concepts/remote_write_spec/#protocol" protobuf: "https://developers.google.com/protocol-buffers" pulsar: "https://pulsar.apache.org/" pulsar_protocol: "https://pulsar.apache.org/docs/en/develop-binary-protocol/"