diff --git a/src/sources/file/line_agg.rs b/src/sources/file/line_agg.rs index e8fe7acb4c550..477e9aebe7289 100644 --- a/src/sources/file/line_agg.rs +++ b/src/sources/file/line_agg.rs @@ -2,7 +2,7 @@ use bytes::{Bytes, BytesMut}; use futures::{Async, Poll, Stream}; use regex::bytes::Regex; use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, VecDeque}; +use std::collections::{hash_map::Entry, HashMap, VecDeque}; use std::hash::Hash; use std::time::Duration; use tokio::timer::DelayQueue; @@ -174,106 +174,82 @@ where /// Handle line, if we have something to output - return it. fn handle_line(&mut self, line: Bytes, src: K) -> Option<(Bytes, K)> { // Check if we already have the buffered data for the source. - if self.buffers.contains_key(&src) { - let condition_matched = self.config.condition_pattern.is_match(line.as_ref()); - match self.config.mode { - // All consecutive lines matching this pattern are included in - // the group. - Mode::ContinueThrough => { - if condition_matched { - let buffered = self - .buffers - .get_mut(&src) - .expect("already asserted key is present"); - buffered.extend_from_slice(b"\n"); - buffered.extend_from_slice(&line); - return None; - } else { - let buffered = self - .buffers - .insert(src.clone(), line.into()) - .expect("already asserted key is present"); - return Some((buffered.freeze(), src)); + match self.buffers.entry(src) { + Entry::Occupied(mut entry) => { + let condition_matched = self.config.condition_pattern.is_match(line.as_ref()); + match self.config.mode { + // All consecutive lines matching this pattern are included in + // the group. + Mode::ContinueThrough => { + if condition_matched { + let buffered = entry.get_mut(); + buffered.extend_from_slice(b"\n"); + buffered.extend_from_slice(&line); + return None; + } else { + let buffered = entry.insert(line.into()); + return Some((buffered.freeze(), entry.key().clone())); + } } - } - // All consecutive lines matching this pattern, plus one - // additional line, are included in the group. - Mode::ContinuePast => { - if condition_matched { - let buffered = self - .buffers - .get_mut(&src) - .expect("already asserted key is present"); - buffered.extend_from_slice(b"\n"); - buffered.extend_from_slice(&line); - return None; - } else { - let mut buffered = self - .buffers - .remove(&src) - .expect("already asserted key is present"); - buffered.extend_from_slice(b"\n"); - buffered.extend_from_slice(&line); - return Some((buffered.freeze(), src)); + // All consecutive lines matching this pattern, plus one + // additional line, are included in the group. + Mode::ContinuePast => { + if condition_matched { + let buffered = entry.get_mut(); + buffered.extend_from_slice(b"\n"); + buffered.extend_from_slice(&line); + return None; + } else { + let (src, mut buffered) = entry.remove_entry(); + buffered.extend_from_slice(b"\n"); + buffered.extend_from_slice(&line); + return Some((buffered.freeze(), src)); + } } - } - // All consecutive lines not matching this pattern are included - // in the group. - Mode::HaltBefore => { - if condition_matched { - let buffered = self - .buffers - .insert(src.clone(), line.into()) - .expect("already asserted key is present"); - return Some((buffered.freeze(), src)); - } else { - let buffered = self - .buffers - .get_mut(&src) - .expect("already asserted key is present"); - buffered.extend_from_slice(b"\n"); - buffered.extend_from_slice(&line); - return None; + // All consecutive lines not matching this pattern are included + // in the group. + Mode::HaltBefore => { + if condition_matched { + let buffered = entry.insert(line.into()); + return Some((buffered.freeze(), entry.key().clone())); + } else { + let buffered = entry.get_mut(); + buffered.extend_from_slice(b"\n"); + buffered.extend_from_slice(&line); + return None; + } } - } - // All consecutive lines, up to and including the first line - // matching this pattern, are included in the group. - Mode::HaltWith => { - if condition_matched { - let mut buffered = self - .buffers - .remove(&src) - .expect("already asserted key is present"); - buffered.extend_from_slice(b"\n"); - buffered.extend_from_slice(&line); - return Some((buffered.freeze(), src)); - } else { - let buffered = self - .buffers - .get_mut(&src) - .expect("already asserted key is present"); - buffered.extend_from_slice(b"\n"); - buffered.extend_from_slice(&line); - return None; + // All consecutive lines, up to and including the first line + // matching this pattern, are included in the group. + Mode::HaltWith => { + if condition_matched { + let (src, mut buffered) = entry.remove_entry(); + buffered.extend_from_slice(b"\n"); + buffered.extend_from_slice(&line); + return Some((buffered.freeze(), src)); + } else { + let buffered = entry.get_mut(); + buffered.extend_from_slice(b"\n"); + buffered.extend_from_slice(&line); + return None; + } } } } - } - - // We reached this code, this means the incoming line, whatever it was, - // was not consumed as a result of the condition matching. - // This line is a candidate for buffering, or passing through. - if self.config.start_pattern.is_match(line.as_ref()) { - // It was indeed a new line we need to filter. - // Set the timeout and buffer this line. - self.timeouts - .insert(src.clone(), self.config.timeout.clone()); - let buffered = self.buffers.insert(src, line.into()); - debug_assert!(buffered.is_none(), "do not throw away the data"); - return None; - } else { - // It's just a regular line we don't really care about. - return Some((line, src)); + Entry::Vacant(entry) => { + // This line is a candidate for buffering, or passing through. + if self.config.start_pattern.is_match(line.as_ref()) { + // It was indeed a new line we need to filter. + // Set the timeout and buffer this line. + self.timeouts + .insert(entry.key().clone(), self.config.timeout.clone()); + entry.insert(line.into()); + return None; + } else { + // It's just a regular line we don't really care about. + return Some((line, entry.into_key())); + } + } } } }