Skip to content

Commit

Permalink
Slip ready_chunks into Transform::Function processing
Browse files Browse the repository at this point in the history
This commit introduces a `ready_chunks` into the pipeline for function
transforms. This combinator polls the underlying stream until either it signals
Pending or the chunk limit is reached, sending a vector of items downstream. In
this commit we pull at most 128 `Event` instances off the input stream for
processing, improving locality.

I have not done the same for task transforms, though I think it could be
done. Without changing the trait type we'd need the inverse of `ready_chunks` to
move from a `Stream<Item = Vec<Event>>` into a `Stream<Item = Event>`.

This is worth +7MB/s in #8512

Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
  • Loading branch information
blt committed Sep 18, 2021
1 parent a32c7fd commit 74c5ffa
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,20 +178,26 @@ pub async fn build_pieces(
let transform = match transform {
Transform::Function(mut t) => input_rx
.filter(move |event| ready(filter_event_type(event, input_type)))
.inspect(|event| {
.ready_chunks(128) // 128 is an arbitrary, smallish constant
.inspect(|events| {
emit!(EventsReceived {
count: 1,
byte_size: event.size_of(),
})
count: events.len(),
byte_size: events.iter().map(|e| e.size_of()).sum(),
});
})
.flat_map(move |v| {
let mut buf = Vec::with_capacity(1);
t.transform(&mut buf, v);
.flat_map(move |events| {
let mut output = Vec::with_capacity(events.len());
let mut buf = Vec::with_capacity(4); // also an arbitrary,
// smallish constant
for v in events {
t.transform(&mut buf, v);
output.append(&mut buf);
}
emit!(EventsSent {
count: buf.len(),
byte_size: buf.iter().map(|event| event.size_of()).sum(),
count: output.len(),
byte_size: output.iter().map(|event| event.size_of()).sum(),
});
stream::iter(buf.into_iter()).map(Ok)
stream::iter(output.into_iter()).map(Ok)
})
.forward(output)
.boxed(),
Expand Down

0 comments on commit 74c5ffa

Please sign in to comment.