Skip to content

Commit

Permalink
PERFORMANCE: filter_func can optionally take an array of events to ma…
Browse files Browse the repository at this point in the history
…ke batched filters much faster

Fixes #8428

Fixes #8444

Fixes #8445
  • Loading branch information
original-brownbear committed Oct 6, 2017
1 parent e09ab28 commit 63a05c0
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 11 deletions.
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def compile
# of the output/filter function
definitions << "define_singleton_method :#{type}_func do |event|"
definitions << " targeted_outputs = []" if type == "output"
definitions << " events = [event]" if type == "filter"
definitions << " events = event" if type == "filter"
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", \"event\" => event.to_hash)"

sections.select { |s| s.plugin_type.text_value == type }.each do |s|
Expand Down
10 changes: 4 additions & 6 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,9 @@ def worker_loop(batch_size, batch_delay)
end

def filter_batch(batch)
batch.each do |event|
filter_func(event).each do |e|
#these are both original and generated events
batch.merge(e) unless e.cancelled?
end
filter_func(batch.to_a).each do |e|
#these are both original and generated events
batch.merge(e) unless e.cancelled?
end
@filter_queue_client.add_filtered_metrics(batch)
@events_filtered.increment(batch.size)
Expand Down Expand Up @@ -539,7 +537,7 @@ def shutdown_workers
# in the pipeline anymore.
def filter(event, &block)
# filter_func returns all filtered events, including cancelled ones
filter_func(event).each { |e| block.call(e) }
filter_func([event]).each {|e| block.call(e)}
end


Expand Down
6 changes: 6 additions & 0 deletions logstash-core/lib/logstash/util/wrapped_acked_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ def cancel(event)
# @cancelled[event] = true
end

def to_a
events = []
each {|e| events << e}
events
end

def each(&blk)
# take care not to cause @originals or @generated to change during iteration

Expand Down
6 changes: 6 additions & 0 deletions logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ def cancel(event)
# @cancelled[event] = true
end

def to_a
events = []
each {|e| events << e}
events
end

def each(&blk)
# take care not to cause @originals or @generated to change during iteration
@iterating = true
Expand Down
8 changes: 4 additions & 4 deletions logstash-core/spec/logstash/pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -585,9 +585,9 @@ class TestPipeline < LogStash::Pipeline

it "should handle evaluating different config" do
expect(pipeline1.output_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
expect(pipeline2.output_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
end
end

Expand Down Expand Up @@ -668,9 +668,9 @@ class TestPipeline < LogStash::Pipeline
# in the current instance and was returning an array containing nil values for
# the match.
expect(pipeline1.output_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
expect(pipeline2.output_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
end
end

Expand Down

0 comments on commit 63a05c0

Please sign in to comment.