Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERFORMANCE: filter_func can optionally take an array of events to make batched filters much faster #8444

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def compile
# of the output/filter function
definitions << "define_singleton_method :#{type}_func do |event|"
definitions << " targeted_outputs = []" if type == "output"
definitions << " events = [event]" if type == "filter"
definitions << " events = event" if type == "filter"
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", \"event\" => event.to_hash)"

sections.select { |s| s.plugin_type.text_value == type }.each do |s|
Expand Down
12 changes: 4 additions & 8 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -498,13 +498,9 @@ def worker_loop(batch_size, batch_delay)
end

def filter_batch(batch)
batch.each do |event|
return if @force_shutdown.true?

filter_func(event).each do |e|
#these are both original and generated events
batch.merge(e) unless e.cancelled?
end
filter_func(batch.to_a).each do |e|
#these are both original and generated events
batch.merge(e) unless e.cancelled?
end
@filter_queue_client.add_filtered_metrics(batch)
@events_filtered.increment(batch.size)
Expand Down Expand Up @@ -662,7 +658,7 @@ def shutdown_workers
def filter(event, &block)
maybe_setup_out_plugins
# filter_func returns all filtered events, including cancelled ones
filter_func(event).each {|e| block.call(e)}
filter_func([event]).each {|e| block.call(e)}
end

# perform filters flush and yield flushed event to the passed block
Expand Down
6 changes: 6 additions & 0 deletions logstash-core/lib/logstash/util/wrapped_acked_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ def cancel(event)
# @cancelled[event] = true
end

def to_a
events = []
each {|e| events << e}
events
end

def each(&blk)
# take care not to cause @originals or @generated to change during iteration

Expand Down
6 changes: 6 additions & 0 deletions logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ def cancel(event)
# @cancelled[event] = true
end

def to_a
events = []
each {|e| events << e}
events
end

def each(&blk)
# take care not to cause @originals or @generated to change during iteration
@iterating = true
Expand Down
8 changes: 4 additions & 4 deletions logstash-core/spec/logstash/pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,9 @@ class TestPipeline < LogStash::Pipeline

it "should handle evaluating different config" do
expect(pipeline1.output_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
expect(pipeline2.output_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
end
end

Expand Down Expand Up @@ -700,9 +700,9 @@ class TestPipeline < LogStash::Pipeline
# in the current instance and was returning an array containing nil values for
# the match.
expect(pipeline1.output_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
expect(pipeline2.output_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
end
end

Expand Down