Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(file sink): Add support for end-to-end acknowledgements #9892

Merged
merged 2 commits into from
Nov 4, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/sinks/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::expiring_hash_map::ExpiringHashMap;
use crate::{
buffers::Acker,
config::{log_schema, DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription},
event::Event,
event::{Event, EventStatus},
internal_events::{FileBytesSent, FileOpen, TemplateRenderingFailed},
sinks::util::{
encoding::{EncodingConfig, EncodingConfiguration},
Expand Down Expand Up @@ -250,14 +250,15 @@ impl FileSink {
Ok(())
}

async fn process_event(&mut self, event: Event) {
async fn process_event(&mut self, mut event: Event) {
let path = match self.partition_event(&event) {
Some(path) => path,
None => {
// We weren't able to find the path to use for the
// file.
// This is already logged at `partition_event`, so
// here we just skip the event.
event.metadata().update_status(EventStatus::Errored);
return;
}
};
Expand All @@ -277,6 +278,7 @@ impl FileSink {
// Maybe other events will work though! Just log
// the error and skip this event.
error!(message = "Unable to open the file.", path = ?path, %error);
event.metadata().update_status(EventStatus::Errored);
return;
}
};
Expand All @@ -292,8 +294,10 @@ impl FileSink {

trace!(message = "Writing an event to file.", path = ?path);
let event_size = event.size_of();
let finalizers = event.metadata_mut().take_finalizers();
match write_event_to_file(file, event, &self.encoding).await {
Ok(byte_size) => {
finalizers.update_status(EventStatus::Delivered);
emit!(&EventsSent {
count: 1,
byte_size: event_size,
Expand All @@ -303,7 +307,10 @@ impl FileSink {
file: String::from_utf8_lossy(&path),
});
}
Err(error) => error!(message = "Failed to write file.", path = ?path, %error),
Err(error) => {
finalizers.update_status(EventStatus::Errored);
error!(message = "Failed to write file.", path = ?path, %error);
}
}
}
}
Expand Down