Skip to content

Commit

Permalink
fix: fixed handling of read errors in indexing routine (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
pamburus authored May 3, 2024
1 parent 90dbdf4 commit 361a2a5
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 9 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ members = [".", "crate/encstr"]
[workspace.package]
repository = "https://github.com/pamburus/hl"
authors = ["Pavel Ivanov <mr.pavel.ivanov@gmail.com>"]
version = "0.29.0-alpha.5"
version = "0.29.0-alpha.6"
edition = "2021"
license = "MIT"

Expand Down
120 changes: 114 additions & 6 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl Indexer {
let mut output = match File::create(&index_path) {
Ok(output) => output,
Err(err) => {
return Err(Error::FailedToOpenFileForReading {
return Err(Error::FailedToOpenFileForWriting {
path: index_path.clone(),
source: err,
});
Expand Down Expand Up @@ -350,8 +350,8 @@ impl Indexer {
}
}));
}
// spawn writer thread
let writer = scope.spawn(move |_| -> Result<Index> {
// spawn builder thread
let builder = scope.spawn(move |_| -> Result<Index> {
let bs = usize::try_from(self.buffer_size)?;
let mut index = Index {
source: SourceFile {
Expand Down Expand Up @@ -384,12 +384,13 @@ impl Indexer {
}
sn += 1;
}
index.save(output)?;
Ok(index)
});
// collect errors from reader and writer threads
// collect errors from reader and builder threads
reader.join().unwrap()?;
writer.join().unwrap()
let index = builder.join().unwrap()?;
index.save(output)?;
Ok(index)
})
.unwrap()
}
Expand Down Expand Up @@ -1003,3 +1004,110 @@ const CURRENT_VERSION: u64 = 1;
xx111111 - [*]seconds in next 64/80/88/96 bits
---
*/

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_process_file_success() {
use std::io::Cursor;
let indexer = Indexer::new(
1,
PathBuf::from("/tmp/cache"),
IndexerSettings::new(
NonZeroU32::new(1024).unwrap(),
NonZeroU32::new(1024).unwrap(),
&PredefinedFields::default(),
Delimiter::default(),
false,
None,
None,
),
);
let data = concat!(
"ts=2023-12-04T10:01:07.091243+01:00 msg=msg1\n",
"ts=2023-12-04T10:01:07.091252+01:00 msg=msg2\n",
"ts=2023-12-04T10:01:07.091633+01:00 msg=msg3\n",
);
let mut input = Cursor::new(data);
let mut output = Cursor::new(Vec::new());
let index = indexer
.process_file(
&PathBuf::from("/tmp/test.log"),
Metadata {
len: data.len() as u64,
modified: (1714739340, 0),
},
&mut input,
&mut output,
None,
)
.unwrap();
assert_ne!(output.into_inner().len(), 0);
assert_eq!(index.source.size, data.len() as u64);
assert_eq!(index.source.path, "/tmp/test.log");
assert_eq!(index.source.modified, (1714739340, 0));
assert_eq!(index.source.stat.lines_valid, 3);
assert_eq!(index.source.stat.lines_invalid, 0);
assert_eq!(index.source.stat.flags, schema::FLAG_HAS_TIMESTAMPS);
assert_eq!(
index.source.stat.ts_min_max,
Some((
Timestamp::from((1701680467, 91243000)),
Timestamp::from((1701680467, 91633000))
))
);
assert_eq!(index.source.blocks.len(), 1);
assert_eq!(index.source.blocks[0].stat.lines_valid, 3);
assert_eq!(index.source.blocks[0].stat.lines_invalid, 0);
assert_eq!(index.source.blocks[0].stat.flags, schema::FLAG_HAS_TIMESTAMPS);
assert_eq!(
index.source.blocks[0].stat.ts_min_max,
Some((
Timestamp::from((1701680467, 91243000)),
Timestamp::from((1701680467, 91633000))
))
);
}

#[test]
fn test_process_file_error() {
use std::io::Cursor;
let indexer = Indexer::new(
1,
PathBuf::from("/tmp/cache"),
IndexerSettings::new(
NonZeroU32::new(1024).unwrap(),
NonZeroU32::new(1024).unwrap(),
&PredefinedFields::default(),
Delimiter::default(),
false,
None,
None,
),
);
let mut input = FailingReader;
let mut output = Cursor::new(Vec::new());
let result = indexer.process_file(
&PathBuf::from("/tmp/test.log"),
Metadata {
len: 135,
modified: (1714739340, 0),
},
&mut input,
&mut output,
None,
);
assert_eq!(result.is_err(), true);
assert_eq!(output.into_inner().len(), 0);
}

struct FailingReader;

impl Read for FailingReader {
fn read(&mut self, _: &mut [u8]) -> std::io::Result<usize> {
Err(std::io::Error::new(std::io::ErrorKind::Other, "read error"))
}
}
}

0 comments on commit 361a2a5

Please sign in to comment.