Skip to content

Commit

Permalink
Use hash_map::Entry
Browse files Browse the repository at this point in the history
Signed-off-by: MOZGIII <mike-n@narod.ru>
  • Loading branch information
MOZGIII committed Feb 20, 2020
1 parent 1062684 commit 93a5a26
Showing 1 changed file with 71 additions and 95 deletions.
166 changes: 71 additions & 95 deletions src/sources/file/line_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream};
use regex::bytes::Regex;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::collections::{hash_map::Entry, HashMap, VecDeque};
use std::hash::Hash;
use std::time::Duration;
use tokio::timer::DelayQueue;
Expand Down Expand Up @@ -174,106 +174,82 @@ where
/// Handle line, if we have something to output - return it.
fn handle_line(&mut self, line: Bytes, src: K) -> Option<(Bytes, K)> {
// Check if we already have the buffered data for the source.
if self.buffers.contains_key(&src) {
let condition_matched = self.config.condition_pattern.is_match(line.as_ref());
match self.config.mode {
// All consecutive lines matching this pattern are included in
// the group.
Mode::ContinueThrough => {
if condition_matched {
let buffered = self
.buffers
.get_mut(&src)
.expect("already asserted key is present");
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
return None;
} else {
let buffered = self
.buffers
.insert(src.clone(), line.into())
.expect("already asserted key is present");
return Some((buffered.freeze(), src));
match self.buffers.entry(src) {
Entry::Occupied(mut entry) => {
let condition_matched = self.config.condition_pattern.is_match(line.as_ref());
match self.config.mode {
// All consecutive lines matching this pattern are included in
// the group.
Mode::ContinueThrough => {
if condition_matched {
let buffered = entry.get_mut();
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
return None;
} else {
let buffered = entry.insert(line.into());
return Some((buffered.freeze(), entry.key().clone()));
}
}
}
// All consecutive lines matching this pattern, plus one
// additional line, are included in the group.
Mode::ContinuePast => {
if condition_matched {
let buffered = self
.buffers
.get_mut(&src)
.expect("already asserted key is present");
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
return None;
} else {
let mut buffered = self
.buffers
.remove(&src)
.expect("already asserted key is present");
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
return Some((buffered.freeze(), src));
// All consecutive lines matching this pattern, plus one
// additional line, are included in the group.
Mode::ContinuePast => {
if condition_matched {
let buffered = entry.get_mut();
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
return None;
} else {
let (src, mut buffered) = entry.remove_entry();
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
return Some((buffered.freeze(), src));
}
}
}
// All consecutive lines not matching this pattern are included
// in the group.
Mode::HaltBefore => {
if condition_matched {
let buffered = self
.buffers
.insert(src.clone(), line.into())
.expect("already asserted key is present");
return Some((buffered.freeze(), src));
} else {
let buffered = self
.buffers
.get_mut(&src)
.expect("already asserted key is present");
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
return None;
// All consecutive lines not matching this pattern are included
// in the group.
Mode::HaltBefore => {
if condition_matched {
let buffered = entry.insert(line.into());
return Some((buffered.freeze(), entry.key().clone()));
} else {
let buffered = entry.get_mut();
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
return None;
}
}
}
// All consecutive lines, up to and including the first line
// matching this pattern, are included in the group.
Mode::HaltWith => {
if condition_matched {
let mut buffered = self
.buffers
.remove(&src)
.expect("already asserted key is present");
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
return Some((buffered.freeze(), src));
} else {
let buffered = self
.buffers
.get_mut(&src)
.expect("already asserted key is present");
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
return None;
// All consecutive lines, up to and including the first line
// matching this pattern, are included in the group.
Mode::HaltWith => {
if condition_matched {
let (src, mut buffered) = entry.remove_entry();
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
return Some((buffered.freeze(), src));
} else {
let buffered = entry.get_mut();
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
return None;
}
}
}
}
}

// We reached this code, this means the incoming line, whatever it was,
// was not consumed as a result of the condition matching.
// This line is a candidate for buffering, or passing through.
if self.config.start_pattern.is_match(line.as_ref()) {
// It was indeed a new line we need to filter.
// Set the timeout and buffer this line.
self.timeouts
.insert(src.clone(), self.config.timeout.clone());
let buffered = self.buffers.insert(src, line.into());
debug_assert!(buffered.is_none(), "do not throw away the data");
return None;
} else {
// It's just a regular line we don't really care about.
return Some((line, src));
Entry::Vacant(entry) => {
// This line is a candidate for buffering, or passing through.
if self.config.start_pattern.is_match(line.as_ref()) {
// It was indeed a new line we need to filter.
// Set the timeout and buffer this line.
self.timeouts
.insert(entry.key().clone(), self.config.timeout.clone());
entry.insert(line.into());
return None;
} else {
// It's just a regular line we don't really care about.
return Some((line, entry.into_key()));
}
}
}
}
}
Expand Down

0 comments on commit 93a5a26

Please sign in to comment.