Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a filter plugin add_insert_ids. #246

Merged
merged 7 commits into from
Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions README.rdoc
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
= Google Cloud Logging plugin for {fluentd}[http://github.com/fluent/fluentd]

fluent-plugin-google-cloud is an
{output plugin for fluentd}[http://docs.fluentd.org/articles/output-plugin-overview]
which sends logs to the
{Stackdriver Logging API}[https://cloud.google.com/logging/docs/api/].
fluent-plugin-google-cloud gem includes two plugins:
1. A {filter plugin for fluentd}[http://docs.fluentd.org/articles/filter-plugin-overview]
that embeds insertIds into log entries to guarantee order and uniqueness.
2. An {output plugin for fluentd}[http://docs.fluentd.org/articles/output-plugin-overview]
which sends logs to the {Stackdriver Logging API}[https://cloud.google.com/logging/docs/api/].

This is an official Google Ruby gem.

Expand All @@ -12,8 +13,7 @@ This is an official Google Ruby gem.

== Installation

This gem is hosted at
{RubyGems.org}[https://rubygems.org/gems/fluent-plugin-google-cloud]
This gem is hosted at {RubyGems.org}[https://rubygems.org/gems/fluent-plugin-google-cloud]
and can be installed using:

$ gem install fluent-plugin-google-cloud
Expand All @@ -23,22 +23,31 @@ will also install and configure the gem.

== Configuration

To embed insertIds into log entries, specify <code>type add_insert_ids</code>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing @ here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

in a {filter clause}[https://docs.fluentd.org/v1.0/articles/config-file#(3)-%E2%80%9Cfilter%E2%80%9D:-event-processing-pipeline]
of your Fluentd configuration file, for example:

<filter **>
@type add_insert_ids
insert_id_key my_insert_id_field_name # Optional.
</filter>

insert_id_key can be used to customize the insertId field name.

To send logs to Google Cloud Logging, specify <code>type google_cloud</code>
in a
{match clause}[http://docs.fluentd.org/articles/config-file#2-ldquomatchrdquo-tell-fluentd-what-to-do]
of your fluentd configuration file, for example:
in a {match clause}[http://docs.fluentd.org/articles/config-file#2-ldquomatchrdquo-tell-fluentd-what-to-do]
of your Fluentd configuration file, for example:

<match **>
type google_cloud
@type google_cloud
</match>

No further configuration is required. The plugin uses
See detailed instructions on how to configure this output plugin {here}[https://cloud.google.com/logging/docs/agent/configuration#cloud-fluentd-config].
The plugin uses
{Google Application Default Credentials}[https://developers.google.com/identity/protocols/application-default-credentials]
for authorization - for additional information see
for authorization - for additional information see
{here}[https://cloud.google.com/logging/docs/agent/authorization].

<em>The previously documented parameters auth_method, private_key_email,
and private_key_path are removed, and can no longer be used.</em>

== Copyright

Expand Down
10 changes: 5 additions & 5 deletions fluent-plugin-google-cloud.gemspec
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
Gem::Specification.new do |gem|
gem.name = 'fluent-plugin-google-cloud'
gem.description = <<-eos
Fluentd output plugin for the Stackdriver Logging API, which will make
logs viewable in the Developer Console's log viewer and can optionally
store them in Google Cloud Storage and/or BigQuery.
Fluentd plugins for the Stackdriver Logging API, which will make logs
viewable in the Stackdriver Logs Viewer and can optionally store them
in Google Cloud Storage and/or BigQuery.
This is an official Google Ruby gem.
eos
gem.summary = 'fluentd output plugin for the Stackdriver Logging API'
gem.summary = 'fluentd plugins for the Stackdriver Logging API'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not releasing the insertId plugin as a separate gem? Expediency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expediency is one reason. Also the way it's used today is tied to out_google_cloud. The original proposal is to make it generic and not specific to Stackdriver (it makes more sense to create a new Github repo and go through launch process if that's the case).

There was a discussion thread in the design doc. We were debating it and ended up going with this approach.

gem.homepage =
'https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud'
gem.license = 'Apache-2.0'
gem.version = '0.6.23'
gem.authors = ['Ling Huang', 'Igor Peshansky']
gem.authors = ['Stackdriver Agents Team']
gem.email = ['stackdriver-agents@google.com']
gem.required_ruby_version = Gem::Requirement.new('>= 2.2')

Expand Down
104 changes: 104 additions & 0 deletions lib/fluent/plugin/filter_add_insert_ids.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2018 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'fluent/plugin/filter'

module Fluent
module Plugin
# Fluentd filter plugin for adding insertIds to guarantee log entry order
# and uniqueness.
bmoyles0117 marked this conversation as resolved.
Show resolved Hide resolved
# Sample log entries enriched by this plugin:
# {
# "timestamp": "2017-08-22 13:35:28",
# "message": "1",
# "logging.googleapis.com/insertId": "aye7eakuf23h41aef0"
# }
# {
# "timestamp": "2017-08-22 13:35:28",
# "message": "2",
# "logging.googleapis.com/insertId": "aye7eakuf23h41aef1"
# }
# {
# "timestamp": "2017-08-22 13:35:28",
# "message": "3",
# "logging.googleapis.com/insertId": "aye7eakuf23h41aef2"
# }
class AddInsertIdsFilter < Filter
Fluent::Plugin.register_filter('add_insert_ids', self)

# Constants for configuration.
module ConfigConstants
# The default field name of insertIds in the log entry.
DEFAULT_INSERT_ID_KEY = 'logging.googleapis.com/insertId'.freeze
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to be kept consistent with the output plugin, doesn't it? Alternatively, we can make the insertId key something else and simply add the correct value to the Stackdriver-specific configs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this has to be consistent with the output plugin.

The original proposal is to make this filter plugin generic with some other default key, so it's not Stackdriver specific. But after discussing (in the design doc) with the team, the consensus is that we should focus on Stackdriver use case for now. This is also why we are not releasing a separate gem for this plugin.

# The character size of the insertIds. This matches the setup in the
# Stackdriver Logging backend.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we link to any documentation about this value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried. I did not see it anywhere in the public sites. This size was taken from the implementation code. In fact, that's just the default size the backend will generate. It's not a "requirement".

INSERT_ID_SIZE = 17
# The characters that are allowed in the insertIds. This matches the
# allowed collection by the Stackdriver Logging Backend.
ALLOWED_CHARS = (Array(0..9) + Array('a'..'z')).freeze
end

include self::ConfigConstants

desc 'The field name for insertIds in the log record.'
config_param :insert_id_key, :string, default: DEFAULT_INSERT_ID_KEY

# Expose attr_readers for testing.
attr_reader :insert_id_key

def start
super
@log = $log # rubocop:disable Style/GlobalVars

# Initialize the insertID.
@log.info "Started the add_insert_ids plugin with #{@insert_id_key}" \
' as the insert ID key.'
@insert_id = initialize_insert_id
@log.info "Initialized the insert ID key to #{@insert_id}."
end

def configure(conf)
super
end

def shutdown
super
end

# rubocop:disable Style/UnusedMethodArgument
def filter(tag, time, record)
# Only generate and add an insertId field if the record is a hash and
# the insert ID field is not already set.
if record.is_a?(Hash) && !record.key?(@insert_id_key) \
&& record[@insert_id_key] != ''
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this does what you intended... You probably meant:

        if record.is_a?(Hash) && (!record.key?(@insert_id_key) || record[@insert_id_key] == '')

Alternatively, you could use:

        if record.is_a?(Hash) && record[@insert_id_key].to_s.empty?

(I think .empty? is preferred to == '' anyway).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unit tests failed for this. I just fixed the logic and committed the unit tests.

record[@insert_id_key] = increment_insert_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could just be:

        record[@insert_id_key] ||= increment_insert_id if record.is_a?(Hash)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added a check against empty strings as Bryan suggested. Then it can't be converted into one line.

end
record
end
# rubocop:enable Style/UnusedMethodArgument

private

# Initialize the insertId to a random string.
def initialize_insert_id
Array.new(INSERT_ID_SIZE) { ALLOWED_CHARS.sample }.join
Copy link
Contributor

@bmoyles0117 bmoyles0117 Aug 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the method is named initialize_, it would make more sense to me if the actual assignment occurred here. WDYT?

@insert_id = Array.new(INSERT_ID_SIZE) { ALLOWED_CHARS.sample }.join

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was the one who proposed initialize, and it seems like it was a mistake. Maybe initial_insert_id or generate_initial_insert_id would work better. I like the fact that it's assigned in start.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to generate_initial_insert_id instead.

end

# Increment the insertId and return the new value.
def increment_insert_id
@insert_id = @insert_id.next
end
end
end
end
97 changes: 97 additions & 0 deletions test/plugin/test_filter_add_insert_ids.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright 2018 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require_relative '../helper'

require 'fluent/test/driver/filter'
require 'fluent/plugin/filter_add_insert_ids'

# Unit tests for filter_add_insert_ids plugin.
class FilterAddInsertIdsTest < Test::Unit::TestCase
include Fluent::Plugin::AddInsertIdsFilter::ConfigConstants

CUSTOM_INSERT_ID_KEY = 'custom_insert_id_key'.freeze
TEST_MESSAGE = 'test message for add_insert_ids plugin.'.freeze
APPLICATION_DEFAULT_CONFIG = ''.freeze
INSERT_ID_KEY_CONFIG = %(
insert_id_key #{CUSTOM_INSERT_ID_KEY}
).freeze

def setup
Fluent::Test.setup
end

def test_configure_insert_id_key
Copy link
Contributor

@davidbtucker davidbtucker Aug 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be clearer to inline the values and split this into 2 tests:

def test_configure_insert_id_key_with_default
  d = create_driver('')
  assert_equal DEFAULT_INSERT_ID_KEY, d.instance.insert_id_key

def test_configure_insert_id_key_with_custom
  d = create_driver('insert_id_key custom_insert_id_key')
  assert_equal 'custom_insert_id_key', d.instance.insert_id_key

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of our unit tests mostly follow the test data provider pattern.

https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/master/test/plugin/base_test.rb#L65
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/master/test/plugin/base_test.rb#L107

It's debatable which pattern is better. Let's discuss it and reach some consensus for future unit tests before introducing a stand-alone pattern. For this PR, I'll leave it as is.

If we do decide to switch, maybe this is a nice FixIt item to improve our tests' readability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, let's defer.

{
APPLICATION_DEFAULT_CONFIG => DEFAULT_INSERT_ID_KEY,
INSERT_ID_KEY_CONFIG => CUSTOM_INSERT_ID_KEY
}.each do |config, insert_id_key|
d = create_driver(config)
assert_equal insert_id_key, d.instance.insert_id_key
end
end

def test_add_insert_ids
total_entry_count = 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this number be smaller (like 5)? Or do we need to test with a high volume?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number was chose to be this high mainly to test against any potential race condition (there should not be).

d = create_driver
d.run do
total_entry_count.times do |index|
d.emit(log_entry(index))
end
end
filtered_events = d.filtered_as_array

assert_equal total_entry_count, filtered_events.size,
"#{total_entry_count} log entries after filtering is" \
" expected. Only #{filtered_events.size} are detected."
# The expected insertId will be assigned as we scan the first log entry.
expected_insert_id = nil
unique_insert_ids = Set.new
filtered_events.each_with_index do |event, index|
assert_equal 3, event.size, "Index #{index} failed. Log event should" \
' include 3 elements: tag, time and record.'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to include the actual size in the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual size will be included in the error already. The

"Index #{index} failed. Log event should" \
                   ' include 3 elements: tag, time and record.'

does not overwrite the original not equal message. It only adds additional information.

record = event[2]
assert_true record.is_a?(Hash), "Index #{index} failed. Log record" \
" #{record} should be a hash."
assert_equal index, record['id'], "Index #{index} failed. Log entries" \
' should come in order.'
assert_equal TEST_MESSAGE, record['message'], "Index #{index} failed."

# Get the first insertID.
expected_insert_id = record[DEFAULT_INSERT_ID_KEY] if index == 0
insert_id = record[DEFAULT_INSERT_ID_KEY]
assert_equal expected_insert_id, insert_id, "Index #{index} failed."
expected_insert_id = expected_insert_id.next
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about checking to see if this is a unique value instead of mimicking the implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's a better alternative? I've been contemplating this and ended up adding the following line:

      assert_true record[DEFAULT_INSERT_ID_KEY] < expected_insert_id,
                  "Index #{index} failed. #{record[DEFAULT_INSERT_ID_KEY]}" \
                  " < #{expected_insert_id} is false."

Are you suggesting something like hashing all the insertIds and make sure we have 1000 unique ones?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, hashing or counting a set of distinct values SGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a check against 1000 unique insertIds.

assert_true insert_id < expected_insert_id,
"Index #{index} failed. #{insert_id}" \
" < #{expected_insert_id} is false."
unique_insert_ids << insert_id
end
assert_equal total_entry_count, unique_insert_ids.size,
"Expected #{total_entry_count} unique insertIds." \
" Only #{unique_insert_ids.size} found."
end

def create_driver(conf = APPLICATION_DEFAULT_CONFIG)
Fluent::Test::FilterTestDriver.new(
Fluent::Plugin::AddInsertIdsFilter).configure(conf, true)
end

def log_entry(index)
{
'id' => index,
'message' => TEST_MESSAGE
}
end
end