Skip to content

Commit

Permalink
chore(appsignal sink): Refactor to use StreamSink
Browse files Browse the repository at this point in the history
Previously, the AppSignal sink was written in what was already a bit of
an older style in PR vectordotdev#16650.

We want to change some functionality in the future for how metrics are
sent. To do this, it looks like we'll need to use the newer sink style,
or at least it will be easier. With this change, the AppSignal sink's
functionality has remained the same.

We have updated the sink to the new StreamSink style, using a
HttpBatchService wrapper to send the requests to the AppSignal public
endpoint API. We followed the [sink guides][2] initially and looked at
other sinks already rewritten linked in [issue vectordotdev#9261][1] to see how to
implement it further.

Updated the integration_tests to test if the sink is a HTTP sink with
the `HTTP_SINK_TAGS`. Previously, it didn't test yet if the
`EndpointBytesSent` event was sent.

We're unsure if `AppsignalResponse`'s `bytes_sent` needs to be
implemented or not. If it returns `None` the tests also pass, but we
thought we might as well implement it properly.

Part of [tracking issue vectordotdev#9261][1]

[1]: vectordotdev#9261
[2]: https://github.com/vectordotdev/vector/blob/600f8191a8fe169eb38c429958dd59714349acb4/docs/tutorials/sinks/1_basic_sink.md

Co-authored-by: Jeff Kreeftmeijer <jeff@kreeft.me>
  • Loading branch information
tombruijn and jeffkreeftmeijer committed Aug 10, 2023
1 parent 8454a6f commit eeb9755
Show file tree
Hide file tree
Showing 3 changed files with 520 additions and 120 deletions.
197 changes: 178 additions & 19 deletions src/sinks/appsignal/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,78 @@
use futures::stream;
use bytes::Bytes;
use futures::{channel::mpsc::Receiver, stream, StreamExt};
use http::header::AUTHORIZATION;
use hyper::StatusCode;
use indoc::indoc;
use vector_core::event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue};
use vector_core::event::{
BatchNotifier, BatchStatus, Event, LogEvent, Metric, MetricKind, MetricValue,
};

use crate::{
config::SinkConfig,
sinks::appsignal::AppsignalSinkConfig,
sinks::util::test::load_sink,
sinks::appsignal::AppsignalConfig,
sinks::util::test::{build_test_server_status, load_sink},
test_util::{
components::{
assert_sink_compliance, assert_sink_error, run_and_assert_sink_compliance,
COMPONENT_ERROR_TAGS, SINK_TAGS,
COMPONENT_ERROR_TAGS, HTTP_SINK_TAGS,
},
generate_lines_with_stream, map_event_batch_stream,
generate_lines_with_stream, map_event_batch_stream, next_addr,
},
};

async fn start_test(events: Vec<Event>) -> (Vec<Event>, Receiver<(http::request::Parts, Bytes)>) {
let config = indoc! {r#"
push_api_key = "${TEST_APPSIGNAL_PUSH_API_KEY}"
compression = "none"
"#};
let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &push_api_key());
let (mut config, cx) = load_sink::<AppsignalConfig>(config.as_str()).unwrap();
let addr = next_addr();
// Set the endpoint to a local server so we can fetch the sent events later
config.endpoint = format!("http://{}", addr);

let (sink, _) = config.build(cx).await.unwrap();

// Always return OK from server. We're not testing responses.
let (rx, _trigger, server) = build_test_server_status(addr, StatusCode::OK);
tokio::spawn(server);

let (batch, receiver) = BatchNotifier::new_with_receiver();

let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));

sink.run(stream).await.unwrap();
assert_eq!(receiver.await, BatchStatus::Delivered);

(events, rx)
}

#[tokio::test]
async fn logs_real_endpoint() {
let config = indoc! {r#"
push_api_key = "${TEST_APPSIGNAL_PUSH_API_KEY}"
"#};
let api_key = std::env::var("TEST_APPSIGNAL_PUSH_API_KEY")
.expect("couldn't find the AppSignal push API key in environment variables");
assert!(!api_key.is_empty(), "$TEST_APPSIGNAL_PUSH_API_KEY required");
let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &api_key);
let (config, cx) = load_sink::<AppsignalSinkConfig>(config.as_str()).unwrap();
let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &push_api_key());
let (config, cx) = load_sink::<AppsignalConfig>(config.as_str()).unwrap();

let (sink, _) = config.build(cx).await.unwrap();
let (batch, receiver) = BatchNotifier::new_with_receiver();
let generator = |index| format!("this is a log with index {}", index);
let (_, events) = generate_lines_with_stream(generator, 10, Some(batch));

run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await;
run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;

assert_eq!(receiver.await, BatchStatus::Delivered);
}

#[tokio::test]
async fn metrics_real_endpoint() {
assert_sink_compliance(&SINK_TAGS, async {
assert_sink_compliance(&HTTP_SINK_TAGS, async {
let config = indoc! {r#"
push_api_key = "${TEST_APPSIGNAL_PUSH_API_KEY}"
"#};
let api_key = std::env::var("TEST_APPSIGNAL_PUSH_API_KEY")
.expect("couldn't find the AppSignal push API key in environment variables");
assert!(!api_key.is_empty(), "$TEST_APPSIGNAL_PUSH_API_KEY required");
let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &api_key);
let (config, cx) = load_sink::<AppsignalSinkConfig>(config.as_str()).unwrap();
let config = config.replace("${TEST_APPSIGNAL_PUSH_API_KEY}", &push_api_key());
let (config, cx) = load_sink::<AppsignalConfig>(config.as_str()).unwrap();

let (sink, _) = config.build(cx).await.unwrap();
let (batch, receiver) = BatchNotifier::new_with_receiver();
Expand All @@ -69,13 +95,139 @@ async fn metrics_real_endpoint() {
.await;
}

#[tokio::test]
async fn metrics_shape() {
let events: Vec<_> = (0..5)
.map(|index| {
Event::Metric(Metric::new(
format!("counter_{}", index),
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
))
})
.collect();
let api_key = push_api_key();
let (expected, rx) = start_test(events).await;
let output = rx.take(expected.len()).collect::<Vec<_>>().await;

for val in output.iter() {
assert_eq!(
val.0.headers.get("Content-Type").unwrap(),
"application/json"
);
assert_eq!(
val.0.headers.get(AUTHORIZATION).unwrap(),
&format!("Bearer {api_key}")
);

let payload = std::str::from_utf8(&val.1).unwrap();
let payload: serde_json::Value = serde_json::from_str(payload).unwrap();
let events = payload.as_array().unwrap();
assert_eq!(events.len(), 5);

let metrics: Vec<(&str, &str, f64)> = events
.iter()
.map(|json_value| {
let metric = json_value
.as_object()
.unwrap()
.get("metric")
.unwrap()
.as_object()
.unwrap();
let name = metric.get("name").unwrap().as_str().unwrap();
let kind = metric.get("kind").unwrap().as_str().unwrap();
let counter = metric.get("counter").unwrap().as_object().unwrap();
let value = counter.get("value").unwrap().as_f64().unwrap();
(name, kind, value)
})
.collect();
assert_eq!(
vec![
("counter_0", "absolute", 0.0),
("counter_1", "absolute", 1.0),
("counter_2", "absolute", 2.0),
("counter_3", "absolute", 3.0),
("counter_4", "absolute", 4.0),
],
metrics
);
}
}

#[tokio::test]
async fn logs_shape() {
let events: Vec<_> = (0..5)
.map(|index| Event::Log(LogEvent::from(format!("Log message {index}"))))
.collect();
let api_key = push_api_key();
let (expected, rx) = start_test(events).await;
let output = rx.take(expected.len()).collect::<Vec<_>>().await;

for val in output.iter() {
assert_eq!(
val.0.headers.get("Content-Type").unwrap(),
"application/json"
);
assert_eq!(
val.0.headers.get(AUTHORIZATION).unwrap(),
&format!("Bearer {api_key}")
);

let payload = std::str::from_utf8(&val.1).unwrap();
let payload: serde_json::Value = serde_json::from_str(payload).unwrap();
let events = payload.as_array().unwrap();
assert_eq!(events.len(), 5);

let log_messages: Vec<&str> = events
.iter()
.map(|value| {
value
.as_object()
.unwrap()
.get("log")
.unwrap()
.as_object()
.unwrap()
.get("message")
.unwrap()
.as_str()
.unwrap()
})
.collect();
assert_eq!(
vec![
"Log message 0",
"Log message 1",
"Log message 2",
"Log message 3",
"Log message 4",
],
log_messages
);

let event = events
.last()
.unwrap()
.as_object()
.unwrap()
.get("log")
.unwrap()
.as_object()
.unwrap();
assert!(!event.get("timestamp").unwrap().as_str().unwrap().is_empty());
}
}

#[tokio::test]
async fn error_scenario_real_endpoint() {
assert_sink_error(&COMPONENT_ERROR_TAGS, async {
let config = indoc! {r#"
push_api_key = "invalid key"
"#};
let (config, cx) = load_sink::<AppsignalSinkConfig>(config).unwrap();
let (config, cx) = load_sink::<AppsignalConfig>(config).unwrap();

let (sink, _) = config.build(cx).await.unwrap();
let (batch, receiver) = BatchNotifier::new_with_receiver();
Expand All @@ -91,3 +243,10 @@ async fn error_scenario_real_endpoint() {
})
.await;
}

fn push_api_key() -> String {
let api_key = std::env::var("TEST_APPSIGNAL_PUSH_API_KEY")
.expect("couldn't find the AppSignal push API key in environment variables");
assert!(!api_key.is_empty(), "$TEST_APPSIGNAL_PUSH_API_KEY required");
api_key
}
Loading

0 comments on commit eeb9755

Please sign in to comment.