Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a filter plugin add_insert_ids. #246

Merged
merged 7 commits into from
Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions README.rdoc
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
= Google Cloud Logging plugin for {fluentd}[http://github.com/fluent/fluentd]

fluent-plugin-google-cloud is an
{output plugin for fluentd}[http://docs.fluentd.org/articles/output-plugin-overview]
which sends logs to the
{Stackdriver Logging API}[https://cloud.google.com/logging/docs/api/].
fluent-plugin-google-cloud gem includes two plugins:
1. A {filter plugin for fluentd}[http://docs.fluentd.org/articles/filter-plugin-overview]
that embeds insertIds into log entries to guarantee order and uniqueness.
2. An {output plugin for fluentd}[http://docs.fluentd.org/articles/output-plugin-overview]
which sends logs to the {Stackdriver Logging API}[https://cloud.google.com/logging/docs/api/].

This is an official Google Ruby gem.

Expand All @@ -12,8 +13,7 @@ This is an official Google Ruby gem.

== Installation

This gem is hosted at
{RubyGems.org}[https://rubygems.org/gems/fluent-plugin-google-cloud]
This gem is hosted at {RubyGems.org}[https://rubygems.org/gems/fluent-plugin-google-cloud]
and can be installed using:

$ gem install fluent-plugin-google-cloud
Expand All @@ -23,22 +23,31 @@ will also install and configure the gem.

== Configuration

To send logs to Google Cloud Logging, specify <code>type google_cloud</code>
in a
{match clause}[http://docs.fluentd.org/articles/config-file#2-ldquomatchrdquo-tell-fluentd-what-to-do]
of your fluentd configuration file, for example:
To embed insertIds into log entries, specify <code>@type add_insert_ids</code>
in a {filter clause}[https://docs.fluentd.org/v1.0/articles/config-file#(3)-%E2%80%9Cfilter%E2%80%9D:-event-processing-pipeline]
of your Fluentd configuration file, for example:

<filter **>
@type add_insert_ids
insert_id_key my_insert_id_field_name # Optional.
</filter>

insert_id_key can be used to customize the insertId field name.

To send logs to Google Cloud Logging, specify <code>@type google_cloud</code>
in a {match clause}[http://docs.fluentd.org/articles/config-file#2-ldquomatchrdquo-tell-fluentd-what-to-do]
of your Fluentd configuration file, for example:

<match **>
type google_cloud
@type google_cloud
</match>

No further configuration is required. The plugin uses
See detailed instructions on how to configure this output plugin {here}[https://cloud.google.com/logging/docs/agent/configuration#cloud-fluentd-config].
The plugin uses
{Google Application Default Credentials}[https://developers.google.com/identity/protocols/application-default-credentials]
for authorization - for additional information see
for authorization - for additional information see
{here}[https://cloud.google.com/logging/docs/agent/authorization].

<em>The previously documented parameters auth_method, private_key_email,
and private_key_path are removed, and can no longer be used.</em>

== Copyright

Expand Down
10 changes: 5 additions & 5 deletions fluent-plugin-google-cloud.gemspec
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
Gem::Specification.new do |gem|
gem.name = 'fluent-plugin-google-cloud'
gem.description = <<-eos
Fluentd output plugin for the Stackdriver Logging API, which will make
logs viewable in the Developer Console's log viewer and can optionally
store them in Google Cloud Storage and/or BigQuery.
Fluentd plugins for the Stackdriver Logging API, which will make logs
viewable in the Stackdriver Logs Viewer and can optionally store them
in Google Cloud Storage and/or BigQuery.
This is an official Google Ruby gem.
eos
gem.summary = 'fluentd output plugin for the Stackdriver Logging API'
gem.summary = 'fluentd plugins for the Stackdriver Logging API'
gem.homepage =
'https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud'
gem.license = 'Apache-2.0'
gem.version = '0.6.23'
gem.authors = ['Ling Huang', 'Igor Peshansky']
gem.authors = ['Stackdriver Agents Team']
gem.email = ['stackdriver-agents@google.com']
gem.required_ruby_version = Gem::Requirement.new('>= 2.2')

Expand Down
103 changes: 103 additions & 0 deletions lib/fluent/plugin/filter_add_insert_ids.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright 2018 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'fluent/plugin/filter'

module Fluent
module Plugin
# Fluentd filter plugin for adding insertIds to guarantee log entry order
# and uniqueness.
bmoyles0117 marked this conversation as resolved.
Show resolved Hide resolved
# Sample log entries enriched by this plugin:
# {
# "timestamp": "2017-08-22 13:35:28",
# "message": "1",
# "logging.googleapis.com/insertId": "aye7eakuf23h41aef0"
# }
# {
# "timestamp": "2017-08-22 13:35:28",
# "message": "2",
# "logging.googleapis.com/insertId": "aye7eakuf23h41aef1"
# }
# {
# "timestamp": "2017-08-22 13:35:28",
# "message": "3",
# "logging.googleapis.com/insertId": "aye7eakuf23h41aef2"
# }
class AddInsertIdsFilter < Filter
Fluent::Plugin.register_filter('add_insert_ids', self)

# Constants for configuration.
module ConfigConstants
# The default field name of insertIds in the log entry.
DEFAULT_INSERT_ID_KEY = 'logging.googleapis.com/insertId'.freeze
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to be kept consistent with the output plugin, doesn't it? Alternatively, we can make the insertId key something else and simply add the correct value to the Stackdriver-specific configs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this has to be consistent with the output plugin.

The original proposal is to make this filter plugin generic with some other default key, so it's not Stackdriver specific. But after discussing (in the design doc) with the team, the consensus is that we should focus on Stackdriver use case for now. This is also why we are not releasing a separate gem for this plugin.

# The character size of the insertIds. This matches the setup in the
# Stackdriver Logging backend.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we link to any documentation about this value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried. I did not see it anywhere in the public sites. This size was taken from the implementation code. In fact, that's just the default size the backend will generate. It's not a "requirement".

INSERT_ID_SIZE = 17
# The characters that are allowed in the insertIds. This matches the
# allowed collection by the Stackdriver Logging Backend.
ALLOWED_CHARS = (Array(0..9) + Array('a'..'z')).freeze
end

include self::ConfigConstants

desc 'The field name for insertIds in the log record.'
config_param :insert_id_key, :string, default: DEFAULT_INSERT_ID_KEY

# Expose attr_readers for testing.
attr_reader :insert_id_key

def start
super
@log = $log # rubocop:disable Style/GlobalVars

# Initialize the insertID.
@log.info "Started the add_insert_ids plugin with #{@insert_id_key}" \
' as the insert ID key.'
@insert_id = generate_initial_insert_id
@log.info "Initialized the insert ID key to #{@insert_id}."
end

def configure(conf)
super
end

def shutdown
super
end

# rubocop:disable Style/UnusedMethodArgument
def filter(tag, time, record)
# Only generate and add an insertId field if the record is a hash and
# the insert ID field is not already set (or set to an empty string).
if record.is_a?(Hash) && record[@insert_id_key].to_s.empty?
record[@insert_id_key] = increment_insert_id
end
record
end
# rubocop:enable Style/UnusedMethodArgument

private

# Generate a random string as the initial insertId.
def generate_initial_insert_id
Array.new(INSERT_ID_SIZE) { ALLOWED_CHARS.sample }.join
end

# Increment the insertId and return the new value.
def increment_insert_id
@insert_id = @insert_id.next
end
end
end
end
135 changes: 135 additions & 0 deletions test/plugin/test_filter_add_insert_ids.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright 2018 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require_relative '../helper'

require 'fluent/test/driver/filter'
require 'fluent/plugin/filter_add_insert_ids'

# Unit tests for filter_add_insert_ids plugin.
class FilterAddInsertIdsTest < Test::Unit::TestCase
include Fluent::Plugin::AddInsertIdsFilter::ConfigConstants

CUSTOM_INSERT_ID_KEY = 'custom_insert_id_key'.freeze
INSERT_ID = 'aeyr82r92h249gh9h'.freeze
TEST_MESSAGE = 'test message for add_insert_ids plugin.'.freeze
APPLICATION_DEFAULT_CONFIG = ''.freeze
INSERT_ID_KEY_CONFIG = %(
insert_id_key #{CUSTOM_INSERT_ID_KEY}
).freeze

def setup
Fluent::Test.setup
end

def test_configure_insert_id_key
Copy link
Contributor

@davidbtucker davidbtucker Aug 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be clearer to inline the values and split this into 2 tests:

def test_configure_insert_id_key_with_default
  d = create_driver('')
  assert_equal DEFAULT_INSERT_ID_KEY, d.instance.insert_id_key

def test_configure_insert_id_key_with_custom
  d = create_driver('insert_id_key custom_insert_id_key')
  assert_equal 'custom_insert_id_key', d.instance.insert_id_key

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of our unit tests mostly follow the test data provider pattern.

https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/master/test/plugin/base_test.rb#L65
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/master/test/plugin/base_test.rb#L107

It's debatable which pattern is better. Let's discuss it and reach some consensus for future unit tests before introducing a stand-alone pattern. For this PR, I'll leave it as is.

If we do decide to switch, maybe this is a nice FixIt item to improve our tests' readability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, let's defer.

{
APPLICATION_DEFAULT_CONFIG => DEFAULT_INSERT_ID_KEY,
INSERT_ID_KEY_CONFIG => CUSTOM_INSERT_ID_KEY
}.each do |config, insert_id_key|
d = create_driver(config)
assert_equal insert_id_key, d.instance.insert_id_key
end
end

def test_add_insert_ids
total_entry_count = 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this number be smaller (like 5)? Or do we need to test with a high volume?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number was chose to be this high mainly to test against any potential race condition (there should not be).

d = create_driver
d.run do
total_entry_count.times do |index|
d.emit(log_entry(index))
end
end
filtered_events = d.filtered_as_array

assert_equal total_entry_count, filtered_events.size,
"#{total_entry_count} log entries after filtering is" \
" expected. Only #{filtered_events.size} are detected."
# The expected insertId will be assigned as we scan the first log entry.
expected_insert_id = nil
unique_insert_ids = Set.new
filtered_events.each_with_index do |event, index|
assert_equal 3, event.size, "Index #{index} failed. Log event should" \
' include 3 elements: tag, time and record.'
record = event[2]
assert_true record.is_a?(Hash), "Index #{index} failed. Log record" \
" #{record} should be a hash."
assert_equal index, record['id'], "Index #{index} failed. Log entries" \
' should come in order.'
assert_equal TEST_MESSAGE, record['message'], "Index #{index} failed."

# Get the first insertID.
expected_insert_id = record[DEFAULT_INSERT_ID_KEY] if index == 0
insert_id = record[DEFAULT_INSERT_ID_KEY]
assert_equal expected_insert_id, insert_id, "Index #{index} failed."
expected_insert_id = expected_insert_id.next
assert_true insert_id < expected_insert_id,
"Index #{index} failed. #{insert_id}" \
" < #{expected_insert_id} is false."
unique_insert_ids << insert_id
end
assert_equal total_entry_count, unique_insert_ids.size,
"Expected #{total_entry_count} unique insertIds." \
" Only #{unique_insert_ids.size} found."
end

def test_insert_ids_not_added_if_present
log_entry_with_empty_insert_id = log_entry(0).merge(
DEFAULT_INSERT_ID_KEY => '')
{
log_entry(0).merge(DEFAULT_INSERT_ID_KEY => INSERT_ID) => true,
# Still generate insertId if it's an empty string
log_entry_with_empty_insert_id => false
}.each do |test_data|
input_log_entry, retain_original_insert_id = test_data
# Make a copy because the log entry gets modified by the filter plugin.
log_entry = input_log_entry.dup
d = create_driver
d.run do
d.emit(log_entry)
end
filtered_events = d.filtered_as_array

assert_equal 1, filtered_events.size, 'Exact 1 log entry after' \
" filtering is expected. Test data: #{test_data}."
event = filtered_events[0]
assert_equal 3, event.size, 'Log event should include 3 elements: tag,' \
" time and record. Test data: #{test_data}."
record = event[2]
assert_true record.is_a?(Hash), "Log record #{record} should be a hash." \
" Test data: #{test_data}."
assert_equal 0, record['id'], "Test data: #{test_data}."
assert_equal TEST_MESSAGE, record['message'], "Test data: #{test_data}."
insert_id = record[DEFAULT_INSERT_ID_KEY]
assert_false insert_id.to_s.empty?, 'Insert ID should not be empty.' \
" Test data: #{test_data}."
assert_equal retain_original_insert_id,
input_log_entry[DEFAULT_INSERT_ID_KEY] == insert_id,
"Input value is #{input_log_entry[DEFAULT_INSERT_ID_KEY]}." \
" Output value is #{insert_id}. Test data: #{test_data}."
end
end

def create_driver(conf = APPLICATION_DEFAULT_CONFIG)
Fluent::Test::FilterTestDriver.new(
Fluent::Plugin::AddInsertIdsFilter).configure(conf, true)
end

def log_entry(index)
{
'id' => index,
'message' => TEST_MESSAGE
}
end
end