Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Another InfluxDB 0.9 PR - Fix #31 & #32 + Previous PRs #33

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## 3.1.0
- New option to enable SSL/TLS encrypted communication to InfluxDB
- DB parameter now supports sprintf formatting

## 3.0.0
- Update to work with influxdb 0.9
- breaking change, renaming 'series' to 'measurement'
- breaking change for values of time_precision
- Special characters now properly escaped in tag key/value, field key, measurement

## 2.0.0
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895
Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Contributors:
* Kurt Hurtado (kurtado)
* Pier-Hugues Pellerin (ph)
* Richard Pijnenburg (electrical)
* Michael Laws (mikelaws)

Note: If you've sent us patches, bug reports, or otherwise contributed to
Logstash, and you aren't on the list above and want to be, please let us know
Expand Down
25 changes: 15 additions & 10 deletions lib/logstash/outputs/influxdb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base

config_name "influxdb"

# The database to write
# The database to write - supports sprintf formatting
config :db, :validate => :string, :default => "statistics"

# The retention policy to use
Expand All @@ -37,6 +37,9 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base
# The password for the user who access to the named database
config :password, :validate => :password, :default => nil

# Enable SSL/TLS secured communication to InfluxDB
config :ssl, :validate => :boolean, :default => false

# Measurement name - supports sprintf formatting
config :measurement, :validate => :string, :default => "logstash"

Expand Down Expand Up @@ -113,10 +116,7 @@ def register

@client = Manticore::Client.new
@queue = []

@query_params = "db=#{@db}&rp=#{@retention_policy}&precision=#{@time_precision}&u=#{@user}&p=#{@password.value}"
@base_url = "http://#{@host}:#{@port}/write"
@url = "#{@base_url}?#{@query_params}"
@protocol = @ssl ? "https" : "http"

buffer_initialize(
:max_items => @flush_size,
Expand Down Expand Up @@ -162,18 +162,23 @@ def receive(event)
}
event_hash["tags"] = tags unless tags.empty?

buffer_receive(event_hash)
buffer_receive(event_hash, event.sprintf(@db))
end # def receive


def flush(events, teardown = false)
@logger.debug? and @logger.debug("Flushing #{events.size} events to #{@url} - Teardown? #{teardown}")
post(events_to_request_body(events))
def flush(events, database, teardown = false)
@logger.debug? and @logger.debug("Flushing #{events.size} events to #{database} - Teardown? #{teardown}")
post(events_to_request_body(events), database)
end # def flush


def post(body)
def post(body, database, proto = @protocol)
begin
@query_params = "db=#{database}&rp=#{@retention_policy}&precision=#{@time_precision}&u=#{@user}&p=#{@password.value}"
@base_url = "#{proto}://#{@host}:#{@port}/write"
@url = "#{@base_url}?#{@query_params}"

@logger.debug? and @logger.debug("POSTing to #{@url}")
@logger.debug? and @logger.debug("Post body: #{body}")
response = @client.post!(@url, :body => body)

Expand Down
2 changes: 1 addition & 1 deletion logstash-output-influxdb.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-output-influxdb'
s.version = '3.0.0'
s.version = '3.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "This output lets you output Metrics to InfluxDB"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand Down
155 changes: 153 additions & 2 deletions spec/outputs/influxdb_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

before do
subject.register
allow(subject).to receive(:post).with(result)
# Added db name parameter to post - M.Laws
allow(subject).to receive(:post).with(result, "statistics")

2.times do
subject.receive(LogStash::Event.new("foo" => "1", "bar" => "2", "time" => "3", "type" => "generator"))
Expand All @@ -39,7 +40,7 @@
let(:result) { "logstash foo=\"1\",bar=\"2\" 3\nlogstash foo=\"1\",bar=\"2\" 3" }

it "should receive 2 events, flush and call post with 2 items json array" do
expect(subject).to have_received(:post).with(result)
expect(subject).to have_received(:post).with(result, "statistics")
end

end
Expand Down Expand Up @@ -354,4 +355,154 @@
pipeline.run
end
end

# Test issue #32 - Add support for HTTPS via configuration
# --------------------------------------------------------
# A simple test to verify that setting the ssl configuration option works
# similar to other Logstash output plugins (specifically the Elasticsearch
# output plugin).
context "setting the ssl configuration option to true" do
let(:config) do <<-CONFIG
input {
generator {
message => "foo=1 bar=2 baz=3 time=4"
count => 1
type => "generator"
}
}

filter {
kv { }
}

output {
influxdb {
host => "localhost"
ssl => true
measurement => "barfoo"
allow_time_override => true
use_event_fields_for_data_points => true
exclude_fields => [ "@version", "@timestamp", "sequence",
"message", "type", "host" ]
}
}
CONFIG
end

let(:expected_url) { 'https://localhost:8086/write?db=statistics&rp=default&precision=ms&u=&p=' }
let(:expected_body) { 'barfoo foo="1",bar="2",baz="3" 4' }

it "should POST to an https URL" do
expect_any_instance_of(Manticore::Client).to receive(:post!).with(expected_url, body: expected_body)
pipeline.run
end
end

# Test issue #31 - Run "db" parameter through event.sprintf() to support...
# -------------------------------------------------------------------------
# This test is intended to verify writes to multiple measurements in A SINGLE
# DATABASE continue to work *after* implementing #31. Also verifies that
# sprintf formatting is supported in the measurement name.
context "receiving 3 points between 2 measurements in 1 database" do
let(:config) do <<-CONFIG
input {
generator {
lines => [
"foo=1 bar=2 baz=m1 time=1",
"foo=3 bar=4 baz=m2 time=2",
"foo=5 bar=6 baz=m2 time=3"
]
count => 1
type => "generator"
}
}

filter {
kv { }
}

output {
influxdb {
host => "localhost"
db => "barfoo"
measurement => "%{baz}"
allow_time_override => true
use_event_fields_for_data_points => true
exclude_fields => [ "@version", "@timestamp", "sequence",
"message", "type", "host" ]
}
}
CONFIG
end

let(:expected_url) { 'http://localhost:8086/write?db=barfoo&rp=default&precision=ms&u=&p=' }
let(:expected_body) { "m1 foo=\"1\",bar=\"2\",baz=\"m1\" 1\nm2 foo=\"3\",bar=\"4\",baz=\"m2\" 2\nm2 foo=\"5\",bar=\"6\",baz=\"m2\" 3" }

it "should result in a single POST (one per database)" do
expect_any_instance_of(Manticore::Client).to receive(:post!).once
pipeline.run
end

it "should POST in bulk format" do
expect_any_instance_of(Manticore::Client).to receive(:post!).with(expected_url, body: expected_body)
pipeline.run
end
end

# Test issue #31 - Run "db" parameter through event.sprintf() to support...
# -------------------------------------------------------------------------
# This test is intended to verify writes to multiple measurements in MULTIPLE
# DATABASES result in separate bulk POSTs (one for each database in the
# buffer), and the correct measurements being written to the correct db.
# Also verifies that sprintf formatting is correctly supported in the
# database name.
context "receiving 4 points between 2 measurements in 2 databases" do
let(:config) do <<-CONFIG
input {
generator {
lines => [
"foo=1 bar=db1 baz=m1 time=1",
"foo=2 bar=db1 baz=m2 time=2",
"foo=3 bar=db2 baz=m1 time=3",
"foo=4 bar=db2 baz=m2 time=4"
]
count => 1
type => "generator"
}
}

filter {
kv { }
}

output {
influxdb {
host => "localhost"
db => "%{bar}"
measurement => "%{baz}"
allow_time_override => true
use_event_fields_for_data_points => true
exclude_fields => [ "@version", "@timestamp", "sequence",
"message", "type", "host" ]
}
}
CONFIG
end

let(:expected_url_db1) { 'http://localhost:8086/write?db=db1&rp=default&precision=ms&u=&p=' }
let(:expected_url_db2) { 'http://localhost:8086/write?db=db2&rp=default&precision=ms&u=&p=' }
let(:expected_body_db1) { "m1 foo=\"1\",bar=\"db1\",baz=\"m1\" 1\nm2 foo=\"2\",bar=\"db1\",baz=\"m2\" 2" }
let(:expected_body_db2) { "m1 foo=\"3\",bar=\"db2\",baz=\"m1\" 3\nm2 foo=\"4\",bar=\"db2\",baz=\"m2\" 4" }

it "should result in two POSTs (one per database)" do
expect_any_instance_of(Manticore::Client).to receive(:post!).twice
pipeline.run
end

it "should post in bulk format" do
expect_any_instance_of(Manticore::Client).to receive(:post!).with(expected_url_db1, body: expected_body_db1)
expect_any_instance_of(Manticore::Client).to receive(:post!).with(expected_url_db2, body: expected_body_db2)
pipeline.run
end
end
end