-
Notifications
You must be signed in to change notification settings - Fork 97
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a filter plugin add_insert_ids. (#246)
- Loading branch information
1 parent
8d26465
commit 23cfc5b
Showing
4 changed files
with
267 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
# Copyright 2018 Google Inc. All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
require 'fluent/plugin/filter' | ||
|
||
module Fluent | ||
module Plugin | ||
# Fluentd filter plugin for adding insertIds to guarantee log entry order | ||
# and uniqueness. | ||
# Sample log entries enriched by this plugin: | ||
# { | ||
# "timestamp": "2017-08-22 13:35:28", | ||
# "message": "1", | ||
# "logging.googleapis.com/insertId": "aye7eakuf23h41aef0" | ||
# } | ||
# { | ||
# "timestamp": "2017-08-22 13:35:28", | ||
# "message": "2", | ||
# "logging.googleapis.com/insertId": "aye7eakuf23h41aef1" | ||
# } | ||
# { | ||
# "timestamp": "2017-08-22 13:35:28", | ||
# "message": "3", | ||
# "logging.googleapis.com/insertId": "aye7eakuf23h41aef2" | ||
# } | ||
class AddInsertIdsFilter < Filter | ||
Fluent::Plugin.register_filter('add_insert_ids', self) | ||
|
||
# Constants for configuration. | ||
module ConfigConstants | ||
# The default field name of insertIds in the log entry. | ||
DEFAULT_INSERT_ID_KEY = 'logging.googleapis.com/insertId'.freeze | ||
# The character size of the insertIds. This matches the setup in the | ||
# Stackdriver Logging backend. | ||
INSERT_ID_SIZE = 17 | ||
# The characters that are allowed in the insertIds. This matches the | ||
# allowed collection by the Stackdriver Logging Backend. | ||
ALLOWED_CHARS = (Array(0..9) + Array('a'..'z')).freeze | ||
end | ||
|
||
include self::ConfigConstants | ||
|
||
desc 'The field name for insertIds in the log record.' | ||
config_param :insert_id_key, :string, default: DEFAULT_INSERT_ID_KEY | ||
|
||
# Expose attr_readers for testing. | ||
attr_reader :insert_id_key | ||
|
||
def start | ||
super | ||
@log = $log # rubocop:disable Style/GlobalVars | ||
|
||
# Initialize the insertID. | ||
@log.info "Started the add_insert_ids plugin with #{@insert_id_key}" \ | ||
' as the insert ID key.' | ||
@insert_id = generate_initial_insert_id | ||
@log.info "Initialized the insert ID key to #{@insert_id}." | ||
end | ||
|
||
def configure(conf) | ||
super | ||
end | ||
|
||
def shutdown | ||
super | ||
end | ||
|
||
# rubocop:disable Style/UnusedMethodArgument | ||
def filter(tag, time, record) | ||
# Only generate and add an insertId field if the record is a hash and | ||
# the insert ID field is not already set (or set to an empty string). | ||
if record.is_a?(Hash) && record[@insert_id_key].to_s.empty? | ||
record[@insert_id_key] = increment_insert_id | ||
end | ||
record | ||
end | ||
# rubocop:enable Style/UnusedMethodArgument | ||
|
||
private | ||
|
||
# Generate a random string as the initial insertId. | ||
def generate_initial_insert_id | ||
Array.new(INSERT_ID_SIZE) { ALLOWED_CHARS.sample }.join | ||
end | ||
|
||
# Increment the insertId and return the new value. | ||
def increment_insert_id | ||
@insert_id = @insert_id.next | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
# Copyright 2018 Google Inc. All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
require_relative '../helper' | ||
|
||
require 'fluent/test/driver/filter' | ||
require 'fluent/plugin/filter_add_insert_ids' | ||
|
||
# Unit tests for filter_add_insert_ids plugin. | ||
class FilterAddInsertIdsTest < Test::Unit::TestCase | ||
include Fluent::Plugin::AddInsertIdsFilter::ConfigConstants | ||
|
||
CUSTOM_INSERT_ID_KEY = 'custom_insert_id_key'.freeze | ||
INSERT_ID = 'aeyr82r92h249gh9h'.freeze | ||
TEST_MESSAGE = 'test message for add_insert_ids plugin.'.freeze | ||
APPLICATION_DEFAULT_CONFIG = ''.freeze | ||
INSERT_ID_KEY_CONFIG = %( | ||
insert_id_key #{CUSTOM_INSERT_ID_KEY} | ||
).freeze | ||
|
||
def setup | ||
Fluent::Test.setup | ||
end | ||
|
||
def test_configure_insert_id_key | ||
{ | ||
APPLICATION_DEFAULT_CONFIG => DEFAULT_INSERT_ID_KEY, | ||
INSERT_ID_KEY_CONFIG => CUSTOM_INSERT_ID_KEY | ||
}.each do |config, insert_id_key| | ||
d = create_driver(config) | ||
assert_equal insert_id_key, d.instance.insert_id_key | ||
end | ||
end | ||
|
||
def test_add_insert_ids | ||
total_entry_count = 1000 | ||
d = create_driver | ||
d.run do | ||
total_entry_count.times do |index| | ||
d.emit(log_entry(index)) | ||
end | ||
end | ||
filtered_events = d.filtered_as_array | ||
|
||
assert_equal total_entry_count, filtered_events.size, | ||
"#{total_entry_count} log entries after filtering is" \ | ||
" expected. Only #{filtered_events.size} are detected." | ||
# The expected insertId will be assigned as we scan the first log entry. | ||
expected_insert_id = nil | ||
unique_insert_ids = Set.new | ||
filtered_events.each_with_index do |event, index| | ||
assert_equal 3, event.size, "Index #{index} failed. Log event should" \ | ||
' include 3 elements: tag, time and record.' | ||
record = event[2] | ||
assert_true record.is_a?(Hash), "Index #{index} failed. Log record" \ | ||
" #{record} should be a hash." | ||
assert_equal index, record['id'], "Index #{index} failed. Log entries" \ | ||
' should come in order.' | ||
assert_equal TEST_MESSAGE, record['message'], "Index #{index} failed." | ||
|
||
# Get the first insertID. | ||
expected_insert_id = record[DEFAULT_INSERT_ID_KEY] if index == 0 | ||
insert_id = record[DEFAULT_INSERT_ID_KEY] | ||
assert_equal expected_insert_id, insert_id, "Index #{index} failed." | ||
expected_insert_id = expected_insert_id.next | ||
assert_true insert_id < expected_insert_id, | ||
"Index #{index} failed. #{insert_id}" \ | ||
" < #{expected_insert_id} is false." | ||
unique_insert_ids << insert_id | ||
end | ||
assert_equal total_entry_count, unique_insert_ids.size, | ||
"Expected #{total_entry_count} unique insertIds." \ | ||
" Only #{unique_insert_ids.size} found." | ||
end | ||
|
||
def test_insert_ids_not_added_if_present | ||
log_entry_with_empty_insert_id = log_entry(0).merge( | ||
DEFAULT_INSERT_ID_KEY => '') | ||
{ | ||
log_entry(0).merge(DEFAULT_INSERT_ID_KEY => INSERT_ID) => true, | ||
# Still generate insertId if it's an empty string | ||
log_entry_with_empty_insert_id => false | ||
}.each do |test_data| | ||
input_log_entry, retain_original_insert_id = test_data | ||
# Make a copy because the log entry gets modified by the filter plugin. | ||
log_entry = input_log_entry.dup | ||
d = create_driver | ||
d.run do | ||
d.emit(log_entry) | ||
end | ||
filtered_events = d.filtered_as_array | ||
|
||
assert_equal 1, filtered_events.size, 'Exact 1 log entry after' \ | ||
" filtering is expected. Test data: #{test_data}." | ||
event = filtered_events[0] | ||
assert_equal 3, event.size, 'Log event should include 3 elements: tag,' \ | ||
" time and record. Test data: #{test_data}." | ||
record = event[2] | ||
assert_true record.is_a?(Hash), "Log record #{record} should be a hash." \ | ||
" Test data: #{test_data}." | ||
assert_equal 0, record['id'], "Test data: #{test_data}." | ||
assert_equal TEST_MESSAGE, record['message'], "Test data: #{test_data}." | ||
insert_id = record[DEFAULT_INSERT_ID_KEY] | ||
assert_false insert_id.to_s.empty?, 'Insert ID should not be empty.' \ | ||
" Test data: #{test_data}." | ||
assert_equal retain_original_insert_id, | ||
input_log_entry[DEFAULT_INSERT_ID_KEY] == insert_id, | ||
"Input value is #{input_log_entry[DEFAULT_INSERT_ID_KEY]}." \ | ||
" Output value is #{insert_id}. Test data: #{test_data}." | ||
end | ||
end | ||
|
||
def create_driver(conf = APPLICATION_DEFAULT_CONFIG) | ||
Fluent::Test::FilterTestDriver.new( | ||
Fluent::Plugin::AddInsertIdsFilter).configure(conf, true) | ||
end | ||
|
||
def log_entry(index) | ||
{ | ||
'id' => index, | ||
'message' => TEST_MESSAGE | ||
} | ||
end | ||
end |