Skip to content

Commit

Permalink
Merge pull request #108 from pamburus/feature/issue/105/a
Browse files Browse the repository at this point in the history
new: Added support for an arbitrary prefix before JSON message
  • Loading branch information
pamburus authored Dec 14, 2023
2 parents 734942e + 75526cd commit 2069700
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ categories = ["command-line-utilities"]
description = "Utility for viewing json-formatted log files."
keywords = ["cli", "human", "log"]
name = "hl"
version = "0.22.1"
version = "0.23.0"
edition = "2021"
build = "build.rs"

Expand Down
39 changes: 23 additions & 16 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,29 +267,25 @@ impl App {
for (rxp, txw) in izip!(rxp, txw) {
workers.push(scope.spawn(closure!(ref parser, |_| -> Result<()> {
let formatter = self.formatter();
let mut processor = SegmentProcessor::new(&parser, &formatter, &self.options.filter);
for (block, ts_min, i, j) in rxp.iter() {
let mut buf = Vec::with_capacity(2 * usize::try_from(block.size())?);
let mut items = Vec::with_capacity(2 * usize::try_from(block.lines_valid())?);
for line in block.into_lines()? {
if line.len() == 0 {
continue;
}
if let Ok(record) = json::from_slice(line.bytes()) {
let record = parser.parse(record);
if record.matches(&self.options.filter) {
let offset = buf.len();
formatter.format_record(&mut buf, record.with_source(line.bytes()));
if let Some(ts) = record.ts {
if let Some(unix_ts) = ts.unix_utc() {
items.push((unix_ts.into(), offset..buf.len()));
} else {
eprintln!("skipped message because timestamp cannot be parsed: {:#?}", ts)
}
processor.run(line.bytes(), &mut buf, "", &mut |record: &Record, location: Range<usize>|{
if let Some(ts) = &record.ts {
if let Some(unix_ts) = ts.unix_utc() {
items.push((unix_ts.into(), location));
} else {
eprintln!("skipped message with missing timestamp")
eprintln!("skipped message because timestamp cannot be parsed: {:#?}", ts)
}
} else {
eprintln!("skipped message with missing timestamp")
}
}
});
}

let buf = Arc::new(buf);
Expand Down Expand Up @@ -705,12 +701,15 @@ impl<'a, F: RecordWithSourceFormatter> SegmentProcessor<'a, F> {
if data.len() == 0 {
continue;
}
let stream = json::Deserializer::from_slice(data).into_iter::<RawRecord>();
let extra_prefix = data.split(|c|*c==b'{').next().unwrap();
let xn = extra_prefix.len();
let json_data = &data[xn..];
let stream = json::Deserializer::from_slice(json_data).into_iter::<RawRecord>();
let mut stream = StreamDeserializerWithOffsets(stream);
let mut some = false;
while let Some(Ok((record, offsets))) = stream.next() {
some = true;
let record = self.parser.parse(record);
let record = self.parser.parse(record).with_prefix(extra_prefix);
if record.matches(self.filter) {
let begin = buf.len();
buf.extend(prefix.as_bytes());
Expand All @@ -719,7 +718,7 @@ impl<'a, F: RecordWithSourceFormatter> SegmentProcessor<'a, F> {
observer.observe_record(&record, begin..end);
}
}
let remainder = if some { &data[stream.0.byte_offset()..] } else { data };
let remainder = if some { &data[xn+stream.0.byte_offset()..] } else { data };
if remainder.len() != 0 && self.filter.is_empty() {
buf.extend_from_slice(remainder);
buf.push(b'\n');
Expand Down Expand Up @@ -758,6 +757,14 @@ impl RecordObserver for TimestampIndexBuilder {

// ---

impl<T: FnMut(&Record, Range<usize>)> RecordObserver for T {
fn observe_record<'b>(&mut self, record: &'b Record<'b>, location: Range<usize>) {
self(record, location)
}
}

// ---

struct TimestampIndex {
block: usize,
lines: Vec<TimestampIndexLine>,
Expand Down
10 changes: 10 additions & 0 deletions src/formatting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ impl RecordFormatter {
}

pub fn format_record(&self, buf: &mut Buf, rec: &model::Record) {
if let Some(prefix) = rec.prefix {
if prefix.len() != 0 {
buf.extend_from_slice(prefix);
if prefix[prefix.len() - 1] != b' ' {
buf.push(b' ');
}
}
}

self.theme.apply(buf, &rec.level, |s| {
//
// time
Expand Down Expand Up @@ -500,6 +509,7 @@ mod tests {
fn test_nested_objects() {
assert_eq!(
format(&Record {
prefix: None,
ts: Some(Timestamp::new("2000-01-02T03:04:05.123Z", None)),
message: Some(RawValue::from_string(r#""tm""#.into()).unwrap().as_ref()),
level: Some(Level::Debug),
Expand Down
2 changes: 2 additions & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ impl Indexer {
let data = strip(data, b'\r');
let mut ts = None;
if data.len() != 0 {
let prefix = data.split(|c| *c == b'{').next().unwrap();
let data = &data[prefix.len()..];
match json::from_slice::<RawRecord>(data) {
Ok(rec) => {
let rec = self.parser.parse(rec);
Expand Down
8 changes: 8 additions & 0 deletions src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub use level::Level;
// ---

pub struct Record<'a> {
pub prefix: Option<&'a [u8]>,
pub ts: Option<Timestamp<'a>>,
pub message: Option<&'a RawValue>,
pub level: Option<Level>,
Expand All @@ -44,8 +45,15 @@ impl<'a> Record<'a> {
filter.apply(self)
}

pub fn with_prefix(mut self, prefix: &'a [u8]) -> Self {
self.prefix = Some(prefix);

return self;
}

fn with_capacity(capacity: usize) -> Self {
Self {
prefix: None,
ts: None,
message: None,
level: None,
Expand Down

0 comments on commit 2069700

Please sign in to comment.