Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka plugin refactor #375

Merged
merged 1 commit into from
Nov 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
## v0.2.3 [unreleased]

### Release Notes
- **breaking change** The `kafka` plugin has been renamed to `kafka_consumer`.
and most of the config option names have changed.
This only affects the kafka consumer _plugin_ (not the
output). There were a number of problems with the kafka plugin that led to it
only collecting data once at startup, so the kafka plugin was basically non-
functional.
- Riemann output added

### Features
- [#379](https://github.com/influxdb/telegraf/pull/379): Riemann output, thanks @allenj!
- [#375](https://github.com/influxdb/telegraf/pull/375): kafka_consumer service plugin.

### Bugfixes
- [#371](https://github.com/influxdb/telegraf/issues/371): Kafka consumer plugin not functioning.

## v0.2.2 [2015-11-18]

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ Telegraf currently has support for collecting metrics from:
* haproxy
* httpjson (generic JSON-emitting http service plugin)
* jolokia (remote JMX with JSON over HTTP)
* kafka_consumer
* leofs
* lustre2
* memcached
Expand Down Expand Up @@ -197,6 +196,7 @@ Telegraf currently has support for collecting metrics from:
Telegraf can collect metrics via the following services:

* statsd
* kafka_consumer

We'll be adding support for many more over the coming months. Read on if you
want to add support for another service or third-party API.
Expand Down
44 changes: 19 additions & 25 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/influxdb/telegraf/plugins"
"github.com/influxdb/telegraf/plugins/exec"
"github.com/influxdb/telegraf/plugins/kafka_consumer"
"github.com/influxdb/telegraf/plugins/memcached"
"github.com/influxdb/telegraf/plugins/procstat"
"github.com/naoina/toml"
"github.com/naoina/toml/ast"
Expand Down Expand Up @@ -205,17 +205,14 @@ func TestConfig_parsePlugin(t *testing.T) {
pluginConfigurationFieldsSet: make(map[string][]string),
}

subtbl := tbl.Fields["kafka"].(*ast.Table)
err = c.parsePlugin("kafka", subtbl)
subtbl := tbl.Fields["memcached"].(*ast.Table)
err = c.parsePlugin("memcached", subtbl)

kafka := plugins.Plugins["kafka"]().(*kafka_consumer.Kafka)
kafka.ConsumerGroupName = "telegraf_metrics_consumers"
kafka.Topic = "topic_with_metrics"
kafka.ZookeeperPeers = []string{"test.example.com:2181"}
kafka.BatchSize = 1000
memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"localhost"}

kConfig := &ConfiguredPlugin{
Name: "kafka",
mConfig := &ConfiguredPlugin{
Name: "memcached",
Drop: []string{"other", "stuff"},
Pass: []string{"some", "strings"},
TagDrop: []TagFilter{
Expand All @@ -233,10 +230,10 @@ func TestConfig_parsePlugin(t *testing.T) {
Interval: 5 * time.Second,
}

assert.Equal(t, kafka, c.plugins["kafka"],
"Testdata did not produce a correct kafka struct.")
assert.Equal(t, kConfig, c.pluginConfigurations["kafka"],
"Testdata did not produce correct kafka metadata.")
assert.Equal(t, memcached, c.plugins["memcached"],
"Testdata did not produce a correct memcached struct.")
assert.Equal(t, mConfig, c.pluginConfigurations["memcached"],
"Testdata did not produce correct memcached metadata.")
}

func TestConfig_LoadDirectory(t *testing.T) {
Expand All @@ -249,14 +246,11 @@ func TestConfig_LoadDirectory(t *testing.T) {
t.Error(err)
}

kafka := plugins.Plugins["kafka"]().(*kafka_consumer.Kafka)
kafka.ConsumerGroupName = "telegraf_metrics_consumers"
kafka.Topic = "topic_with_metrics"
kafka.ZookeeperPeers = []string{"test.example.com:2181"}
kafka.BatchSize = 10000
memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"192.168.1.1"}

kConfig := &ConfiguredPlugin{
Name: "kafka",
mConfig := &ConfiguredPlugin{
Name: "memcached",
Drop: []string{"other", "stuff"},
Pass: []string{"some", "strings"},
TagDrop: []TagFilter{
Expand Down Expand Up @@ -296,10 +290,10 @@ func TestConfig_LoadDirectory(t *testing.T) {

pConfig := &ConfiguredPlugin{Name: "procstat"}

assert.Equal(t, kafka, c.plugins["kafka"],
"Merged Testdata did not produce a correct kafka struct.")
assert.Equal(t, kConfig, c.pluginConfigurations["kafka"],
"Merged Testdata did not produce correct kafka metadata.")
assert.Equal(t, memcached, c.plugins["memcached"],
"Merged Testdata did not produce a correct memcached struct.")
assert.Equal(t, mConfig, c.pluginConfigurations["memcached"],
"Merged Testdata did not produce correct memcached metadata.")

assert.Equal(t, ex, c.plugins["exec"],
"Merged Testdata did not produce a correct exec struct.")
Expand Down
5 changes: 3 additions & 2 deletions outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type InfluxDB struct {
}

var sampleConfig = `
# The full HTTP or UDP endpoint URL for your InfluxDB instance
# Multiple urls can be specified for InfluxDB cluster support.
# The full HTTP or UDP endpoint URL for your InfluxDB instance.
# Multiple urls can be specified but it is assumed that they are part of the same
# cluster, this means that only ONE of the urls will be written to each interval.
# urls = ["udp://localhost:8089"] # UDP endpoint example
urls = ["http://localhost:8086"] # required
# The target database for metrics (telegraf will create it if not exists)
Expand Down
Loading