Skip to content

Commit

Permalink
Reducing the number of call to fsync on the directory.
Browse files Browse the repository at this point in the history
This work by introducing a new API method in the Directory
trait.
The user needs to explicitely call this method.
(In particular, once before a commmit)

Closes #1225
  • Loading branch information
fulmicoton committed Dec 2, 2021
1 parent 03c2f6e commit eed568a
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 52 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ Tantivy 0.17
================================
- Change to non-strict schema. Ignore fields in data which are not defined in schema. Previously this returned an error. #1211
- Facets are necessarily indexed. Existing index with indexed facets should work out of the box. Index without facets that are marked with index: false should be broken (but they were already broken in a sense). (@fulmicoton) #1195 .
- Bugfix that could in theory impact durability in theory on some filesystems [#1224](https://github.com/quickwit-inc/tantivy/issues/1224)
- Reduce the number of fsync calls [#1225](https://github.com/quickwit-inc/tantivy/issues/1225)

Tantivy 0.16.2
================================
Expand Down
16 changes: 14 additions & 2 deletions src/directory/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,16 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// Opens a writer for the *virtual file* associated with
/// a Path.
///
/// Right after this call, the file should be created
/// and any subsequent call to `open_read` for the
/// Right after this call, for the duratio of the program
/// the file should be created and any subsequent call to `open_read` for the
/// same path should return a `FileSlice`.
///
/// However, depending on the directory implementation,
/// it might be required to call `sync_directory` to ensure
/// that the file is durably created.
/// (The semantics here are the same when dealing with
/// a posix filesystem.)
///
/// Write operations may be aggressively buffered.
/// The client of this trait is responsible for calling flush
/// to ensure that subsequent `read` operations
Expand Down Expand Up @@ -176,6 +182,12 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// The file may or may not previously exist.
fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()>;

/// Sync the directory.
///
/// This call is required to ensure that newly created files are
/// effectively stored durably.
fn sync_directory(&self) -> io::Result<()>;

/// Acquire a lock in the given directory.
///
/// The method is blocking or not depending on the `Lock` object.
Expand Down
22 changes: 20 additions & 2 deletions src/directory/managed_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ impl ManagedDirectory {
for delete_file in &deleted_files {
managed_paths_write.remove(delete_file);
}
self.directory.sync_directory()?;
save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?;
}

Expand Down Expand Up @@ -222,9 +223,21 @@ impl ManagedDirectory {
.write()
.expect("Managed file lock poisoned");
let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned());
if has_changed {
save_managed_paths(self.directory.as_ref(), &meta_wlock)?;
if !has_changed {
return Ok(());
}
save_managed_paths(self.directory.as_ref(), &meta_wlock)?;
if meta_wlock.managed_paths.len() >= 1 {
// This is not the first file we add.
// `.managed.json` has been already properly created and we do not need to
// sync its parent directory.
//
// Note we do not try to create the managed.json file, upon the
// creation of the ManagedDirectory because it would prevent
// the usage of a ReadOnly directory.
return Ok(());
}
self.directory.sync_directory()?;
Ok(())
}

Expand Down Expand Up @@ -310,6 +323,11 @@ impl Directory for ManagedDirectory {
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
self.directory.watch(watch_callback)
}

fn sync_directory(&self) -> io::Result<()> {
self.directory.sync_directory()?;
Ok(())
}
}

impl Clone for ManagedDirectory {
Expand Down
85 changes: 40 additions & 45 deletions src/directory/mmap_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,33 +211,6 @@ impl MmapDirectory {
self.inner.root_path.join(relative_path)
}

/// Sync the root directory.
/// In certain FS, this is required to persistently create
/// a file.
fn sync_directory(&self) -> Result<(), io::Error> {
let mut open_opts = OpenOptions::new();

// Linux needs read to be set, otherwise returns EINVAL
// write must not be set, or it fails with EISDIR
open_opts.read(true);

// On Windows, opening a directory requires FILE_FLAG_BACKUP_SEMANTICS
// and calling sync_all() only works if write access is requested.
#[cfg(windows)]
{
use std::os::windows::fs::OpenOptionsExt;
use winapi::um::winbase;

open_opts
.write(true)
.custom_flags(winbase::FILE_FLAG_BACKUP_SEMANTICS);
}

let fd = open_opts.open(&self.inner.root_path)?;
fd.sync_data()?;
Ok(())
}

/// Returns some statistical information
/// about the Mmap cache.
///
Expand Down Expand Up @@ -367,22 +340,17 @@ impl Directory for MmapDirectory {
/// removed before the file is deleted.
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
let full_path = self.resolve_path(path);
match fs::remove_file(&full_path) {
Ok(_) => self.sync_directory().map_err(|e| DeleteError::IoError {
io_error: e,
filepath: path.to_path_buf(),
}),
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {
Err(DeleteError::FileDoesNotExist(path.to_owned()))
} else {
Err(DeleteError::IoError {
io_error: e,
filepath: path.to_path_buf(),
})
fs::remove_file(&full_path).map_err(|e| {
if e.kind() == io::ErrorKind::NotFound {
DeleteError::FileDoesNotExist(path.to_owned())
} else {
DeleteError::IoError {
io_error: e,
filepath: path.to_path_buf(),
}
}
}
})?;
Ok(())
}

fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
Expand Down Expand Up @@ -411,10 +379,13 @@ impl Directory for MmapDirectory {
file.flush()
.map_err(|io_error| OpenWriteError::wrap_io_error(io_error, path.to_path_buf()))?;

// Apparetntly, on some filesystem syncing the parent
// directory is required.
self.sync_directory()
.map_err(|io_err| OpenWriteError::wrap_io_error(io_err, path.to_path_buf()))?;
// Note we actually do not sync the parent directory here.
//
// A newly created file, may, in some case, be created and even flushed to disk.
// and then lost...
//
// The file will only be durably written after we terminate AND
// sync_directory() is called.

let writer = SafeFileWriter::new(file);
Ok(BufWriter::new(Box::new(writer)))
Expand Down Expand Up @@ -470,6 +441,30 @@ impl Directory for MmapDirectory {
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
Ok(self.inner.watch(watch_callback))
}

fn sync_directory(&self) -> Result<(), io::Error> {
let mut open_opts = OpenOptions::new();

// Linux needs read to be set, otherwise returns EINVAL
// write must not be set, or it fails with EISDIR
open_opts.read(true);

// On Windows, opening a directory requires FILE_FLAG_BACKUP_SEMANTICS
// and calling sync_all() only works if write access is requested.
#[cfg(windows)]
{
use std::os::windows::fs::OpenOptionsExt;
use winapi::um::winbase;

open_opts
.write(true)
.custom_flags(winbase::FILE_FLAG_BACKUP_SEMANTICS);
}

let fd = open_opts.open(&self.inner.root_path)?;
fd.sync_all()?;
Ok(())
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions src/directory/ram_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ impl Directory for RamDirectory {
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
Ok(self.fs.write().unwrap().watch(watch_callback))
}

fn sync_directory(&self) -> io::Result<()> {
Ok(())
}
}

#[cfg(test)]
Expand Down
5 changes: 4 additions & 1 deletion src/indexer/segment_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ pub fn save_new_metas(
payload: None,
},
directory,
)
)?;
directory.sync_directory()?;
Ok(())
}

/// Save the index meta file.
Expand All @@ -82,6 +84,7 @@ fn save_metas(metas: &IndexMeta, directory: &dyn Directory) -> crate::Result<()>
io::ErrorKind::Other,
msg.unwrap_or_else(|| "Undefined".to_string())
))));
directory.sync_directory()?;
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
Ok(())
Expand Down
5 changes: 3 additions & 2 deletions src/store/compression_lz4_block.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::io::{self};

use core::convert::TryInto;
use std::mem;
use lz4_flex::{compress_into, decompress_into};
use std::mem;

#[inline]
#[allow(clippy::uninit_vec)]
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
compressed.clear();
let maximum_ouput_size = mem::size_of::<u32>() + lz4_flex::block::get_maximum_output_size(uncompressed.len());
let maximum_ouput_size =
mem::size_of::<u32>() + lz4_flex::block::get_maximum_output_size(uncompressed.len());
compressed.reserve(maximum_ouput_size);
unsafe {
compressed.set_len(maximum_ouput_size);
Expand Down

0 comments on commit eed568a

Please sign in to comment.