-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: Rework the Datadog logs sink #8825
Conversation
✔️ Deploy Preview for vector-project ready! 🔨 Explore the source changes: 14d15ef 🔍 Inspect the deploy log: https://app.netlify.com/sites/vector-project/deploys/6142435429c0e90007ac021d 😎 Browse the preview: https://deploy-preview-8825--vector-project.netlify.app/docs/reference/configuration/sources/datadog_agent |
80bdfc4 is the first version of this work that runs. It runs at twice the speed of master. |
src/sinks/datadog/logs/sink.rs
Outdated
} | ||
|
||
/// Flush the `Event` batches out to Datadog API | ||
async fn flush_to_api(&mut self) -> Result<(), ()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought: what if this returned futures and the select loop was responsible for polling? That way we can interleave flushes where we cannot presently.
If we center back-pressure, concurrency etc in the client it will need to be a tower thing. Happily Service::call
returns a future so we can await that before building up futures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't fully reviewed the structure of this, other than noting it looks much simpler, just leaving notes about event finalization.
src/sinks/datadog/logs/sink.rs
Outdated
let members: Vec<BTreeMap<String, Value>> = batch | ||
.into_iter() | ||
.map(|event| event.into_log()) | ||
.map(|log| log.into_parts().0) | ||
.collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so here you break the log events into parts, and drop the metadata. This marks the event as "dropped" for finalization. I think your best bet here is to iterate through the batch and map them with event.as_mut_log().take_finalizer()
and collect those into a batch. ie:
let finalizers: Vec<_> = batch.iter_mut().map(|event| event.as_mut_log().take_finalizer()).collect();
src/sinks/datadog/logs/sink.rs
Outdated
// .map_err(Into::into) | ||
.expect("failed to make request"); | ||
|
||
self.http_client.call(request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, after this call resolves, you would need catch the result of the call and call update_status
, something like this:
self.http_client.call(request) | |
async move { | |
let result = self.http_client.call(request).await; | |
let status: EventStatus = match result { … }; | |
for finalizer in finalizers { | |
finalizer.update_status(status); | |
} | |
result | |
} |
Very helpful comments @bruceg. Thank you. |
I've added in the basic telemetry plus some starting in b6832a4. Now that we have access to the |
* Introduce a `rename_key` on `LogEvent` This commit introduces a new function to `LogEvent` that allows for in-place key renames. This resolves #8491 and is an extract from the ongoing #8825. Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * feedback Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * Correct function reference name Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
This commit extracts the two traits mentioned in the subject from #8884. These traits allow us to treat with `Event` instances in a type-system generic way and will have bearing in work on #8825 as well. This work was originally done by @tobz in #8884. Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
This commit extracts the two traits mentioned in the subject from #8884. These traits allow us to treat with `Event` instances in a type-system generic way and will have bearing in work on #8825 as well. This work was originally done by @tobz in #8884. Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
This commit is an extract of the work being done in #8825. The goal with that work is to change the datadog logs sink to be done purely in terms of `Stream`, with the goal of improving all our sinks along similar lines. Most of the code in that PR is hampered by the lack of streaming batching, whence this commit. I've introduces a new type into vector-core: `Batcher`. This builds on the `Partitioner` notion introduced in #8914 to divvy up incoming items into a key space and ensure that blobs of items are issued when they consume too much memory, if the blob is full or if the key has sat for too long unflushed. This commit includes proptest property tests as opposed to quickcheck with the hope that one day we'll be able to use [propverify](https://github.com/project-oak/rust-verification-tools) in vector. Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
This commit is an extract of the work being done in #8825. The goal with that work is to change the datadog logs sink to be done purely in terms of `Stream`, with the goal of improving all our sinks along similar lines. Most of the code in that PR is hampered by the lack of streaming batching, whence this commit. I've introduces a new type into vector-core: `Batcher`. This builds on the `Partitioner` notion introduced in #8914 to divvy up incoming items into a key space and ensure that blobs of items are issued when they consume too much memory, if the blob is full or if the key has sat for too long unflushed. This commit includes proptest property tests as opposed to quickcheck with the hope that one day we'll be able to use [propverify](https://github.com/project-oak/rust-verification-tools) in vector. Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
This commit is an extract of the work being done in #8825. The goal with that work is to change the datadog logs sink to be done purely in terms of `Stream`, with the goal of improving all our sinks along similar lines. Most of the code in that PR is hampered by the lack of streaming batching, whence this commit. I've introduces a new type into vector-core: `Batcher`. This builds on the `Partitioner` notion introduced in #8914 to divvy up incoming items into a key space and ensure that blobs of items are issued when they consume too much memory, if the blob is full or if the key has sat for too long unflushed. This commit includes proptest property tests as opposed to quickcheck with the hope that one day we'll be able to use [propverify](https://github.com/project-oak/rust-verification-tools) in vector. Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
* Introduce a `Batcher` stream processor This commit is an extract of the work being done in #8825. The goal with that work is to change the datadog logs sink to be done purely in terms of `Stream`, with the goal of improving all our sinks along similar lines. Most of the code in that PR is hampered by the lack of streaming batching, whence this commit. I've introduces a new type into vector-core: `Batcher`. This builds on the `Partitioner` notion introduced in #8914 to divvy up incoming items into a key space and ensure that blobs of items are issued when they consume too much memory, if the blob is full or if the key has sat for too long unflushed. This commit includes proptest property tests as opposed to quickcheck with the hope that one day we'll be able to use [propverify](https://github.com/project-oak/rust-verification-tools) in vector. Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * Adjust type expectations Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * Avoid a Poll::Pending Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * Avoid shipping under-full batches Caught by Luke, in the event that the timer fired we would ship underfull batches for the current partition. Resolved. Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * Make time async Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * clippy dings Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * comment bug Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * use Delay not Skip Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * documentation request Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * destruct -> into_inner Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * Collapse timer response Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * tighten up function Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
This is the first version of this work that runs. It's about double the speed of the original. Acks are not implemented, nor are concurrency limits obeyed. Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good! Just a couple of small notes.
async fn run_io<S>(mut rx: Receiver<LogApiRequest>, mut service: S, acker: Acker) | ||
where | ||
S: Service<LogApiRequest>, | ||
S::Future: Send + 'static, | ||
S::Response: Response + Send + 'static, | ||
S::Error: Debug + Into<crate::Error> + Send, | ||
{ | ||
let in_flight = FuturesUnordered::new(); | ||
let mut pending_acks: HashMap<u64, usize, BuildHasherDefault<XxHash64>> = HashMap::default(); | ||
let mut seq_head: u64 = 0; | ||
let mut seq_tail: u64 = 0; | ||
|
||
pin!(in_flight); | ||
|
||
loop { | ||
gauge!("inflight_requests", in_flight.len() as f64); | ||
select! { | ||
Some(req) = rx.recv() => { | ||
// Rebind the variable to avoid a bug with the pattern matching | ||
// in `select!`: https://github.com/tokio-rs/tokio/issues/4076 | ||
let mut req = req; | ||
let seqno = seq_head; | ||
seq_head += 1; | ||
|
||
let (tx, rx) = oneshot::channel(); | ||
|
||
in_flight.push(rx); | ||
|
||
trace!( | ||
message = "Submitting service request.", | ||
in_flight_requests = in_flight.len() | ||
); | ||
// TODO: This likely need be parameterized, which builds a | ||
// stronger case for following through with the comment | ||
// mentioned below. | ||
let logic = StdServiceLogic::default(); | ||
// TODO: I'm not entirely happy with how we're smuggling | ||
// batch_size/finalizers this far through, from the finished | ||
// batch all the way through to the concrete request type...we | ||
// lifted this code from `ServiceSink` directly, but we should | ||
// probably treat it like `PartitionBatcher` and shove it into a | ||
// single, encapsulated type instead. | ||
let batch_size = req.payload_members_len; | ||
let finalizers = req.take_finalizers(); | ||
|
||
let svc = service.ready().await.expect("should not get error when waiting for svc readiness"); | ||
let fut = svc.call(req) | ||
.err_into() | ||
.map(move |result| { | ||
logic.update_finalizers(result, finalizers); | ||
|
||
// If the rx end is dropped we still completed | ||
// the request so this is a weird case that we can | ||
// ignore for now. | ||
let _ = tx.send((seqno, batch_size)); | ||
}) | ||
.instrument(info_span!("request", request_id = %seqno)); | ||
tokio::spawn(fut); | ||
}, | ||
|
||
Some(Ok((seqno, batch_size))) = in_flight.next() => { | ||
trace!("pending batch {} finished (n={})", seqno, batch_size); | ||
pending_acks.insert(seqno, batch_size); | ||
|
||
let mut num_to_ack = 0; | ||
while let Some(ack_size) = pending_acks.remove(&seq_tail) { | ||
num_to_ack += ack_size; | ||
seq_tail += 1 | ||
} | ||
trace!(message = "Acking events.", acking_num = num_to_ack); | ||
acker.ack(num_to_ack); | ||
}, | ||
|
||
else => break | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the semantics of a layer are maybe not the best fit here, but it seems like it's already very close to being reusable. I think all we'd need to do is make it generic over the request type and we could just reuse this as a function here and in the S3 sink. Could be a good intermediate step at least.
let request_builder = RequestBuilder::new(encoding.clone(), compression, log_schema); | ||
tokio::spawn(async move { request_builder.build(key, batch) }) | ||
}) | ||
.buffer_unordered(io_bandwidth); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Between this and the IO channel, will we end up with 128 built requests buffered before we propagate backpressure from the service itself? That's not necessarily bad, we'll just want to keep in mind memory use, responsiveness, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's true. The user's in-memory limit is, so far as I'm aware, soft in our documentation so it's within our constraints to add this padding but I don't love it. Similar thing in the s3 sink(s). Maybe we do a follow-up PR to deduct any internal buffering from the batch limit set by the user? Implies a need to set a minimal batch size at the config level I think.
FWIW in practice I can't get this buffer to actual populate all that much. Every so often I wonder what a fixed-memory vector would look like and these would sure need to count against the global memory.
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
As of 0029dec I have an error in place for when the payload crosses Datadog's 5Mb uncompressed byte limit. I'd be open to thoughts on how to properly deal with this. I have tried making What I'm going to propose is a little chintzy and I want to call it out before doing it. In b0056ed I introduced a slightly lower batch bytes goal, 750KB less than the maximum. This leaves the request building dead simple, we kick out an error if the payload is overlarge -- manifesting as a log message. The |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks good, but I think the bits about API key and batching behavior need some clarity.
match status { | ||
StatusCode::OK => Ok(LogApiResponse::Ok), | ||
StatusCode::BAD_REQUEST => Ok(LogApiResponse::BadRequest), | ||
StatusCode::FORBIDDEN => Ok(LogApiResponse::PermissionIssue), | ||
StatusCode::PAYLOAD_TOO_LARGE => Ok(LogApiResponse::PayloadTooLarge), | ||
_ => Err(LogApiError::ServerError), | ||
} | ||
} | ||
Err(error) => Err(LogApiError::HttpError { error }), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely clear on this. Why aren't we pushing bad request, permission issue, etc, as errors? We can still classify them as non-retriable, but it seems like they are demonstrably errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This I don't love. I think what happened is I got confused while working on this between the call requirements imposed by RetryLogic
and those implies by sink::Response
. Looking at it I'm still not totally sure why I wrote it this way or if kicking these others out as errors would be better or worse. I'm open to suggestions.
I would like to get to a point where we have one way of classifying if a request succeeded or failed. Perhaps I'm missing something but the two traits referenced strike me as redundant, if not quite identical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I took another look at all of this, and had a few follow-up thoughts..
- You shouldn't need that
sink::Response
implementation, I don't think. OldSink
-based stuff. RetryLogic
has an overridable method --should_retry_response
-- that can classify responses, which would be appropriate here, I believe.
Generally, I think most of the known errors aren't retryable -- payload size isn't gonna magically become smaller, etc -- but it would be the more correct implementation to fill in should_retry_response
.
My meta-thought here is more around the notion that we have responses that are known to be errors, but we don't seem to do anything about that in the way of logging. Like, part of it just feels a little weird, you know?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaah I think you are correct about sink::Response
. I think this trait could be relaxed out of the s3 sink then as well. I'll fiddle with this and file a follow-up issue if it works out.
Regarding the logging, I thought for sure there was some somewhere but I'm failing to find it presently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. sink::Response
is needed by StdServiceLogic
, that's the trick there. I think in order for the finalizers to fire correctly this oddball structure is necessary, which does in turn make the error signaling weird. It seems to me we could change ServiceLogic
to use just RetryLogic
to classify success/failure -- or make a new trait to replace it -- but this is a weird halfway state.
With regard to the logging, StdServiceLogic
's implementation of ServiceLogic::result_status
does that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... yes. Yes I think the only tie here to sink::Response
are the finailizers through the ServiceLogic
but that I do believe we could clear out. We only need to get from a request result to an EventStatus
, which seems very do-able in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, woops, right. That's what I get for late-night PR reviewing.
Alright, screw it, this is a take it or leave it: we should be able to bury the finalization logic into a layer, while taking advantage of RetryLogic
. Or we rename/clone RetryLogic
to something more generic. If you want to flesh that out here as a proof-of-concept for starting to abstract this stuff, I'm game.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll file a follow-up ticket to keep this PR from going too, too long. I think it'll involve me fiddling in the s3 sink as well since this pattern exists there as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow-up: #9140
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I think this is ready, modulo any remaining cleanup or tweaks you want to do.
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really have much to say about the general approach. Nothing stood out as obviously bad, though I share @tobz concerns about lumping error results into the Ok
response in the service. I have a few minor questions below.
let sink = VectorSink::Sink(Box::new(sink)); | ||
|
||
Ok((sink, healthcheck)) | ||
Ok(HttpClient::new(tls_settings, proxy)?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Ok(X?)
form here seems redundant. Can this be done without the ?
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The types don't line up properly. the crate::Result
requires a boxed dynamic instance of StdError + Send + Sync + 'static
and the client returns an http::HttpError
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, then it becomes HttpClient::new(tls_settings, proxy).map_err(Into::into)
. Tomayto, Tomahto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I'd really like to see -- and I think it's possible once we have our first pass over the sinks done -- is to make the error generic in the topology so we can drop the boxing.
let (_expected, mut rx) = start_test(StatusCode::FORBIDDEN, BatchStatus::Failed).await; | ||
let (_expected, mut rx) = start_test(StatusCode::FORBIDDEN, BatchStatus::Errored).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this error result change? This would be a semantic change in acknowledgement handling of errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The status here is FORBIDDEN
which against the Datadog API could be transient, if your API key will be valid in the future for instance or we have an auth outage. My understanding was that Errored
is for transient issues, where Failed
is for permanent problem.
let service = self | ||
.service | ||
.take() | ||
.expect("same sink should not be run twice"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, I don't understand the question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. I wondered if making that string (same sink) into a const
item would be worth doing. It might make rustfmt
wrap the expression onto one line if you're careful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaah I see what you mean.
This commit removes the referenced trait from the sink and from the `TowerRequestLayer` implementation. Other callers can choose to leave this constraint in place but it is not required. This change has also allowed me to clean up the odd Ok/Err split noted in #8825. I intend to do a similar bit of work to the s3 sinks. REF #9140 Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
* Removes `sink::Response` from the Datadog Logs sink This commit removes the referenced trait from the sink and from the `TowerRequestLayer` implementation. Other callers can choose to leave this constraint in place but it is not required. This change has also allowed me to clean up the odd Ok/Err split noted in #8825. I intend to do a similar bit of work to the s3 sinks. REF #9140 Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * Use `AsRef` instead of `Into` The use of `Into` for this purpose had two problems. First and less important, being able to express `EventStatus` -> `*ApiResponse` is weird. Second and more important, taking ownership of the API response _happens_ to work for this sink but is not a guarantee for other sinks. Signed-off-by: Brian L. Troutwine <brian@troutwine.us> * dinged by check-events Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
This PR rethinks how our Datadog logs sink works with an eye toward applying a similar transformation to each of our sinks. The work here is driven by the observation in #8512 that we spend a great deal of time in vector polling futures. After tracing out the path of an
Event
through our previous stack it's clear why we're doing that:The top portion of this diagram details the type hierarchy at play and the bottom the actual code path in practice for an event, born out by examining live stack traces. None of these calls are inlined with one another -- consistent with the amount of boxing and dynamic dispatch going on -- so we do in fact pay for all the polls. It's also quite hard to understand this hierarchy and make changes to it.
So, this PR is a rethink of the whole setup as it affects the Datadog logs sink. Highlights:
This approach is at least twice as fast as our present approach. I expect the patch to be code positive but only because of inlined rustdocs.
Things missing as of this writing:
With regard to the last point these are accomplished today by use of tower-based layering. I haven't decided whether the
HttpClient
should be responsible for participating in these concerns -- leaving the stream processor ignorant of them -- or if the stream processor should itself be aService
.Method
The basic method at play in the previous hierarchy was inheritance. A
DatadogLogsSink
is a type ofHttpSink
that exposes a few methods and is called by some obscure mechanism elsewhere. What I'm pushing for here is composition. ALogApi
is a stream processor that is composed with an http client that knows about back-pressure, concurrency limits etc and clicks this together with buffering logic specific to the logs API.