Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(datadog_metrics sink): incrementally encode sketches #17764

Merged
merged 7 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ serde_yaml = { version = "0.9.22", default-features = false }
rmp-serde = { version = "1.1.1", default-features = false, optional = true }
rmpv = { version = "1.0.0", default-features = false, features = ["with-serde"], optional = true }

# Prost
# Prost / Protocol Buffers
prost = { version = "0.11", default-features = false, features = ["std"] }
prost-reflect = { version = "0.11", default-features = false, optional = true }
prost-types = { version = "0.11", default-features = false, optional = true }

# GCP
Expand Down Expand Up @@ -673,7 +674,7 @@ sinks-databend = []
sinks-datadog_archives = ["sinks-aws_s3", "sinks-azure_blob", "sinks-gcp"]
sinks-datadog_events = []
sinks-datadog_logs = []
sinks-datadog_metrics = ["protobuf-build"]
sinks-datadog_metrics = ["protobuf-build", "dep:prost-reflect"]
sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"]
sinks-file = ["dep:async-compression"]
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ proc-macro2,https://github.com/dtolnay/proc-macro2,MIT OR Apache-2.0,"David Toln
proptest,https://github.com/proptest-rs/proptest,MIT OR Apache-2.0,Jason Lingle
prost,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert <dan@danburkert.com>, Lucio Franco <luciofranco14@gmail.com, Tokio Contributors <team@tokio.rs>"
prost-derive,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert <dan@danburkert.com>, Lucio Franco <luciofranco14@gmail.com>, Tokio Contributors <team@tokio.rs>"
prost-reflect,https://github.com/andrewhickman/prost-reflect,MIT OR Apache-2.0,Andrew Hickman <andrew.hickman1@sky.com>
ptr_meta,https://github.com/djkoloski/ptr_meta,MIT,David Koloski <djkoloski@gmail.com>
pulsar,https://github.com/streamnative/pulsar-rs,MIT OR Apache-2.0,"Colin Stearns <cstearns@developers.wyyerd.com>, Kevin Stenerson <kstenerson@developers.wyyerd.com>, Geoffroy Couprie <contact@geoffroycouprie.com>"
quad-rand,https://github.com/not-fl3/quad-rand,MIT,not-fl3 <not.fl3@gmail.com>
Expand Down
22 changes: 20 additions & 2 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use std::{collections::HashSet, env, fs::File, io::Write, path::Path, process::Command};
use std::{
collections::HashSet,
env,
fs::File,
io::Write,
path::{Path, PathBuf},
process::Command,
};

struct TrackedEnv {
tracked: HashSet<String>,
Expand Down Expand Up @@ -124,8 +131,19 @@ fn main() {
println!("cargo:rerun-if-changed=proto/google/rpc/status.proto");
println!("cargo:rerun-if-changed=proto/vector.proto");

// Create and store the "file descriptor set" from the compiled Protocol Buffers packages.
// This allows us to use runtime reflection to manually build Protocol Buffers payloads
// in a type-safe way, which is necessary for incrementally building certain payloads, like
// the ones generated in the `datadog_metrics` sink.
let protobuf_fds_path = std::env::var("OUT_DIR")
.map(PathBuf::from)
.map(|path| path.join("protobuf-fds.bin"))
.expect("OUT_DIR environment variable not set");
tobz marked this conversation as resolved.
Show resolved Hide resolved

let mut prost_build = prost_build::Config::new();
prost_build.btree_map(["."]);
prost_build
.btree_map(["."])
.file_descriptor_set_path(protobuf_fds_path);

tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
Expand Down
14 changes: 6 additions & 8 deletions src/internal_events/datadog_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ use vector_common::internal_event::{
};

#[derive(Debug)]
pub struct DatadogMetricsEncodingError {
pub error_message: &'static str,
pub struct DatadogMetricsEncodingError<'a> {
pub reason: &'a str,
pub error_code: &'static str,
pub dropped_events: usize,
}

impl InternalEvent for DatadogMetricsEncodingError {
impl<'a> InternalEvent for DatadogMetricsEncodingError<'a> {
fn emit(self) {
let reason = "Failed to encode Datadog metrics.";
error!(
message = reason,
error = %self.error_message,
error_code = %self.error_code,
message = self.reason,
error_code = self.error_code,
error_type = error_type::ENCODER_FAILED,
intentional = "false",
stage = error_stage::PROCESSING,
Expand All @@ -35,7 +33,7 @@ impl InternalEvent for DatadogMetricsEncodingError {
if self.dropped_events > 0 {
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: self.dropped_events,
reason,
reason: self.reason,
});
}
}
Expand Down
5 changes: 0 additions & 5 deletions src/proto.rs

This file was deleted.

20 changes: 20 additions & 0 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))]
use crate::event::proto as event;

#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))]
pub mod vector;

#[cfg(feature = "sinks-datadog_metrics")]
pub mod fds {
use once_cell::sync::OnceCell;
use prost_reflect::DescriptorPool;

static PROTOBUF_FDS: OnceCell<DescriptorPool> = OnceCell::new();
tobz marked this conversation as resolved.
Show resolved Hide resolved

pub fn get_protobuf_descriptors() -> &'static DescriptorPool {
tobz marked this conversation as resolved.
Show resolved Hide resolved
PROTOBUF_FDS.get_or_init(|| {
DescriptorPool::decode(include_bytes!(concat!(env!("OUT_DIR"), "/protobuf-fds.bin")).as_ref())
.expect("should not fail to decode protobuf file descriptor set generated from build script")
})
}
}
5 changes: 5 additions & 0 deletions src/sinks/datadog/metrics/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ impl DatadogMetricsEndpoint {
DatadogMetricsEndpoint::Sketches => "application/x-protobuf",
}
}

// Gets whether or not this is a series endpoint.
pub const fn is_series(self) -> bool {
matches!(self, Self::Series)
}
}

/// Maps Datadog metric endpoints to their actual URI.
Expand Down
Loading