Skip to content

Commit

Permalink
Split AppSignal sink into separate modules
Browse files Browse the repository at this point in the history
As per review feedback: split the new sink style into separate module
files.
  • Loading branch information
tombruijn committed Aug 15, 2023
1 parent 81cf6a7 commit 230f005
Show file tree
Hide file tree
Showing 8 changed files with 565 additions and 498 deletions.
199 changes: 199 additions & 0 deletions src/sinks/appsignal/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
use futures::FutureExt;
use http::{header::AUTHORIZATION, Request, Uri};
use hyper::Body;
use tower::ServiceBuilder;
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;
use vector_core::{
config::{proxy::ProxyConfig, AcknowledgementsConfig, DataType, Input},
tls::{MaybeTlsSettings, TlsEnableableConfig},
};

use crate::{
codecs::Transformer,
http::HttpClient,
sinks::{
prelude::{SinkConfig, SinkContext},
util::{
http::HttpStatusRetryLogic, BatchConfig, Compression, ServiceBuilderExt,
SinkBatchSettings, TowerRequestConfig,
},
BuildError, Healthcheck, HealthcheckError, VectorSink,
},
};

use super::{
service::{AppsignalResponse, AppsignalService},
sink::AppsignalSink,
};

/// Configuration for the `appsignal` sink.
#[configurable_component(sink("appsignal", "Deliver log and metric event data to AppSignal."))]
#[derive(Clone, Debug, Default)]
pub struct AppsignalConfig {
/// The URI for the AppSignal API to send data to.
#[configurable(validation(format = "uri"))]
#[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))]
#[serde(default = "default_endpoint")]
pub endpoint: String,

/// A valid app-level AppSignal Push API key.
#[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
#[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))]
push_api_key: SensitiveString,

#[configurable(derived)]
#[serde(default = "Compression::gzip_default")]
compression: Compression,

#[configurable(derived)]
#[serde(default)]
batch: BatchConfig<AppsignalDefaultBatchSettings>,

#[configurable(derived)]
#[serde(default)]
request: TowerRequestConfig,

#[configurable(derived)]
tls: Option<TlsEnableableConfig>,

#[configurable(derived)]
#[serde(
default,
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
encoding: Transformer,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
acknowledgements: AcknowledgementsConfig,
}

pub(crate) fn default_endpoint() -> String {
"https://appsignal-endpoint.net".to_string()
}

#[derive(Clone, Copy, Debug, Default)]
pub(crate) struct AppsignalDefaultBatchSettings;

impl SinkBatchSettings for AppsignalDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = Some(100);
const MAX_BYTES: Option<usize> = Some(450_000);
const TIMEOUT_SECS: f64 = 1.0;
}

impl AppsignalConfig {
pub(crate) fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
let tls = MaybeTlsSettings::from_config(&self.tls, false)?;
let client = HttpClient::new(tls, proxy)?;
Ok(client)
}

pub(crate) fn build_sink(&self, http_client: HttpClient) -> crate::Result<VectorSink> {
let batch_settings = self.batch.into_batcher_settings()?;

let endpoint = endpoint_uri(&self.endpoint, "vector/events")?;
let push_api_key = self.push_api_key.clone();
let compression = self.compression;
let service = AppsignalService::new(http_client, endpoint, push_api_key, compression);

let request_opts = self.request;
let request_settings = request_opts.unwrap_with(&TowerRequestConfig::default());
let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status);

let service = ServiceBuilder::new()
.settings(request_settings, retry_logic)
.service(service);

let transformer = self.encoding.clone();
let sink = AppsignalSink {
service,
compression,
transformer,
batch_settings,
};

Ok(VectorSink::from_event_streamsink(sink))
}
}

impl_generate_config_from_default!(AppsignalConfig);

#[async_trait::async_trait]
#[typetag::serde(name = "appsignal")]
impl SinkConfig for AppsignalConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let client = self.build_client(cx.proxy())?;
let healthcheck = healthcheck(
endpoint_uri(&self.endpoint, "vector/healthcheck")?,
self.push_api_key.inner().to_string(),
client.clone(),
)
.boxed();
let sink = self.build_sink(client)?;

Ok((sink, healthcheck))
}

fn input(&self) -> Input {
Input::new(DataType::Metric | DataType::Log)
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> {
let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {}", push_api_key));
let response = client.send(request.body(Body::empty()).unwrap()).await?;

match response.status() {
status if status.is_success() => Ok(()),
other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
}
}

pub fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result<Uri> {
let uri = if endpoint.ends_with('/') {
format!("{endpoint}{path}")
} else {
format!("{endpoint}/{path}")
};
match uri.parse::<Uri>() {
Ok(u) => Ok(u),
Err(e) => Err(Box::new(BuildError::UriParseError { source: e })),
}
}

#[cfg(test)]
mod test {
use super::{endpoint_uri, AppsignalConfig};

#[test]
fn generate_config() {
crate::test_util::test_generate_config::<AppsignalConfig>();
}

#[test]
fn endpoint_uri_with_path() {
let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events");
assert_eq!(
uri.expect("Not a valid URI").to_string(),
"https://appsignal-endpoint.net/vector/events"
);
}

#[test]
fn endpoint_uri_with_trailing_slash() {
let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events");
assert_eq!(
uri.expect("Not a valid URI").to_string(),
"https://appsignal-endpoint.net/vector/events"
);
}
}
52 changes: 52 additions & 0 deletions src/sinks/appsignal/encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use serde_json::{json, Value};
use vector_common::request_metadata::GroupedCountByteSize;
use vector_core::{config::telemetry, event::Event, EstimatedJsonEncodedSizeOf};

use crate::{
codecs::Transformer,
sinks::util::encoding::{as_tracked_write, Encoder},
};

#[derive(Clone)]
pub(crate) struct AppsignalEncoder {
pub transformer: Transformer,
}

impl Encoder<Vec<Event>> for AppsignalEncoder {
fn encode_input(
&self,
events: Vec<Event>,
writer: &mut dyn std::io::Write,
) -> std::io::Result<(usize, GroupedCountByteSize)> {
let mut result = Value::Array(Vec::new());
let mut byte_size = telemetry().create_request_count_byte_size();
for mut event in events {
self.transformer.transform(&mut event);

byte_size.add_event(&event, event.estimated_json_encoded_size_of());

let json = match event {
Event::Log(log) => json!({ "log": log }),
Event::Metric(metric) => json!({ "metric": metric }),
_ => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"The AppSignal sink does not support this type of event: {event:?}"
),
))
}
};
if let Value::Array(ref mut array) = result {
array.push(json);
}
}
let written_bytes =
as_tracked_write::<_, _, std::io::Error>(writer, &result, |writer, value| {
serde_json::to_writer(writer, value)?;
Ok(())
})?;

Ok((written_bytes, byte_size))
}
}
2 changes: 1 addition & 1 deletion src/sinks/appsignal/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use vector_core::event::{

use crate::{
config::SinkConfig,
sinks::appsignal::AppsignalConfig,
sinks::appsignal::config::AppsignalConfig,
sinks::util::test::{build_test_server_status, load_sink},
test_util::{
components::{
Expand Down
Loading

0 comments on commit 230f005

Please sign in to comment.