Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(buffers): correctly handle partial writes in reader seek during initialization #17099

Merged
merged 3 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 44 additions & 11 deletions lib/vector-buffers/src/variants/disk_v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,13 +899,42 @@ where
// update `self.last_reader_record_id`, so basically... just keep reading records until
// we're past the last record we had acknowledged.
while self.last_reader_record_id < ledger_last {
if self.next().await?.is_none() && self.last_reader_record_id == 0 {
// We've hit a point where there's no more data to read. If our "last reader record
// ID" hasn't moved at all, that means the buffer was already empty and we're caught
// up, so we just pin ourselves to where the ledger says we left off, and we're good
// to go.
self.last_reader_record_id = ledger_last;
break;
match self.next().await {
Ok(maybe_record) => {
if maybe_record.is_none() && self.last_reader_record_id == 0 {
// We've hit a point where there's no more data to read. If our "last reader record
// ID" hasn't moved at all, that means the buffer was already empty and we're caught
// up, so we just pin ourselves to where the ledger says we left off, and we're good
// to go.
self.last_reader_record_id = ledger_last;
break;
}
}
Err(e) if e.is_bad_read() => {
// If we hit a bad read during initialization, we should only continue calling
// `next` if we have not advanced _past_ the writer in terms of file ID.
//
// If the writer saw the same error we just saw, it will have rolled itself to
// the next file, lazily: for example, it discovers a bad record at the end of
// file ID 3, so it marks itself to open file ID 4 next, but hasn't yet
// created it, and is still technically indicated as being on file ID 3.
//
// Meanwhile, if _we_ try to also roll to file ID 4 and read from it, we'll deadlock
// ourselves because it doesn't yet exist. However, `next` immediately updates our
// reader file ID as soon as it hits a bad read error, so in this scenario,
// we're now marked as being on file ID 4 while the writer is still on file ID
// 3.
//
// From that, we can determine that when we've hit a bad read error, that if our
// file ID is greater than the writer's file ID, we're now essentially
// synchronized.
let (reader_file_id, writer_file_id) =
self.ledger.get_current_reader_writer_file_id();
if reader_file_id > writer_file_id {
break;
}
}
Err(e) => return Err(e),
}
}

Expand Down Expand Up @@ -965,11 +994,15 @@ where

let (reader_file_id, writer_file_id) = self.ledger.get_current_reader_writer_file_id();

// Essentially: is the writer still writing to this data file or not?
// Essentially: is the writer still writing to this data file or not, and are we
// actually ready to read (aka initialized)?
//
// A necessary invariant to have to understand if the record reader should actually keep
// waiting for data, or if a data file had a partial write/missing data and should be skipped.
let is_finalized = reader_file_id != writer_file_id;
// This is a necessary invariant to understand if the record reader should actually keep
// waiting for data, or if a data file had a partial write/missing data and should be
// skipped. In particular, not only does this matter for deadlocking during shutdown due
// to improper writer behavior/flushing, but it also matters during initialization in
// case where the current data file had a partial write.
let is_finalized = (reader_file_id != writer_file_id) || !self.ready_to_read;

// Try reading a record, which if successful, gives us a token to actually read/get a
// reference to the record. This is a slightly-tricky song-and-dance due to rustc not
Expand Down
89 changes: 89 additions & 0 deletions lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::time::Duration;

use tokio::time::timeout;
use tracing::Instrument;

use crate::{
test::{acknowledge, install_tracing_helpers, with_temp_dir, SizedRecord},
variants::disk_v2::tests::{create_default_buffer_v2, set_file_length},
};

#[tokio::test]
async fn reader_doesnt_block_from_partial_write_on_last_record() {
// When initializing, the reader will be catching up to the last record it read, which involves
// reading individual records in the current reader data file until a record is returned whose
// record ID matches the "last record ID read" field from the ledger.
//
// However, if the last record read by the reader was never fully synced to disk, we could be
// left with a partial write: enough data to read the length delimiter, but not enough data to
// actually read as many bytes as are indicated by said length delimiter.
//
// This would leave us waiting forever for bytes that will never come, because the writer isn't
// going to do anything, as we're in initialization.
//
// This test ensures that if we hit a partial write during initialization, we correctly avoid
// sitting around forever, waiting for a write that isn't coming.
let _a = install_tracing_helpers();

let fut = with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();

async move {
// Create a regular buffer, no customizations required.
let (mut writer, mut reader, ledger) = create_default_buffer_v2(data_dir.clone()).await;

// Write a record, and then read it and acknowledge it. This puts the buffer into a
// state where there's data in the current data file, and the ledger has a non-zero
// record ID for where it thinks the reader needs to be. This ensures that the reader
// actually does at least one call to `Reader::next` during `Reader::seek_to_next_record`.
let first_bytes_written = writer
.write_record(SizedRecord::new(64))
.await
.expect("should not fail to write");
writer.flush().await.expect("flush should not fail");
writer.close();

let first_read = reader
.next()
.await
.expect("should not fail to read record")
.expect("should contain first record");
assert_eq!(SizedRecord::new(64), first_read);
acknowledge(first_read).await;

let second_read = reader.next().await.expect("should not fail to read record");
assert!(second_read.is_none());

ledger.flush().expect("should not fail to flush ledger");

// Grab the current writer data file path before dropping the buffer.
let data_file_path = ledger.get_current_writer_data_file_path();
drop(reader);
drop(writer);
drop(ledger);

// Open the data file and drop the last eight bytes of the record, which will ensure
// that there is less data available to read than the number of bytes indicated by the
// record's length delimiter.
let initial_len = first_bytes_written as u64;
let target_len = initial_len - 8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of any, but could there be a case where 8 doesn't trigger this condition - should it be arbitrary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, all that matters is that we remove a portion of the actual record payload, so that the amount of bytes encoded in the length delimiter is greater than the number of bytes available to read after it.

set_file_length(&data_file_path, initial_len, target_len)
.await
.expect("should not fail to truncate data file");

// Now reopen the buffer, which should complete in a timely fashion without an immediate error.
let reopen = timeout(
Duration::from_millis(500),
create_default_buffer_v2::<_, SizedRecord>(data_dir),
)
.await;
assert!(
reopen.is_ok(),
"failed to reopen buffer in a timely fashion; likely deadlock"
);
}
});

let parent = trace_span!("reader_doesnt_block_from_partial_write_on_last_record");
fut.instrument(parent.or_current()).await;
}
31 changes: 30 additions & 1 deletion lib/vector-buffers/src/variants/disk_v2/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::{
};

use async_trait::async_trait;
use tokio::io::DuplexStream;
use tokio::{
fs::OpenOptions,
io::{AsyncWriteExt, DuplexStream},
};

use super::{
io::{AsyncFile, Metadata, ProductionFilesystem, ReadableMemoryMap, WritableMemoryMap},
Expand All @@ -22,6 +25,7 @@ type FilesystemUnderTest = ProductionFilesystem;

mod acknowledgements;
mod basic;
mod initialization;
mod invariants;
mod known_errors;
mod model;
Expand Down Expand Up @@ -381,3 +385,28 @@ where
.await
.expect("read should produce a record")
}

pub(crate) async fn set_file_length<P: AsRef<Path>>(
path: P,
initial_len: u64,
target_len: u64,
) -> io::Result<()> {
let mut file = OpenOptions::new()
.write(true)
.open(&path)
.await
.expect("open should not fail");

// Just to make sure the file matches the expected starting length before futzing with it.
let metadata = file.metadata().await.expect("metadata should not fail");
assert_eq!(initial_len, metadata.len());

file.set_len(target_len)
.await
.expect("set_len should not fail");
file.flush().await.expect("flush should not fail");
file.sync_all().await.expect("sync should not fail");
drop(file);

Ok(())
}