Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka recipe #113

Merged
merged 8 commits into from
Apr 3, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .kitchen.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,40 @@ suites:
instances:
- host: localhost
port: 27017

- name: dd-agent-kafka
run_list:
- recipe[datadog::dd-handler]
- recipe[datadog::kafka]
attributes:
datadog:
api_key: somethingnotnil
application_key: alsonotnil
kafka:
instances:
- host: localhost
port: 9999
name: my_kafka
user: username
password: password
java_bin_path: /path/to/java
trust_store_path: /path/to/trustStore.jks
trust_store_password: password

- name: dd-agent-kafka-consumer
run_list:
- recipe[datadog::dd-handler]
- recipe[datadog::kafka_consumer]
attributes:
datadog:
api_key: somethingnotnil
application_key: alsonotnil
kafka_consumer:
instances:
- kafka_connect_str: localhost:19092
zk_connect_str: localhost:2181
zk_prefix: /0.8
consumer_groups:
my_consumer:
my_topic: [0, 1, 4, 12]

25 changes: 25 additions & 0 deletions recipes/kafka.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
include_recipe "datadog::dd-agent"

# Monitor Kafka
#
# Assuming you have 2 clusters "test" and "prod",
# one with and one without authentication
# you need to set up the following attributes
# node.datadog.kafka.instances = [
# {
# :host => "localhost",
# :port => "9999",
# :name => "prod",
# :username => "username",
# :password => "secret"
# },
# {
# :host => "localhost",
# :port => "8199",
# :name => "test"
# }
# ]

datadog_monitor "kafka" do
instances node['datadog']['kafka']['instances']
end
16 changes: 16 additions & 0 deletions recipes/kafka_consumer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include_recipe "datadog::dd-agent"

# Monitor Kafka
#
# You need to set up the following attributes
# node.datadog.kafka_consumer.instances = [
# {
# :kafka_connect_str => "localhost:19092",
# :zk_connect_str => "localhost:2181",
# :zk_prefix => "/0.8"
# }
# ]

datadog_monitor "kafka_consumer" do
instances node['datadog']['kafka_consumer']['instances']
end
164 changes: 164 additions & 0 deletions templates/default/kafka.yaml.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
instances:
<% @instances.each do |i| -%>
- host: <%= i['host'] %>
port: <%= i['port'] %>
<% if i['name'] -%>
name: <%= i['name'] %>
<% end -%>
<% if i['user'] -%>
user: <%= i['user'] %>
<% end -%>
<% if i['password'] -%>
password: <%= i['password'] %>
<% end -%>
<% if i['java_bin_path'] -%>
java_bin_path: <%= i['java_bin_path'] %> #Optional, should be set if the agent cannot find your java executable
<% end -%>
<% if i['trust_store_path'] -%>
trust_store_path: <%= i['trust_store_path'] %> # Optional, should be set if ssl is enabled
<% end -%>
<% if i['trust_store_password'] -%>
trust_store_password: <%= i['trust_store_password'] %>
<% end -%>
<% end -%>

init_config:
is_jmx: true

# Metrics collected by this check. You should not have to modify this.
conf:
#
# Aggregate cluster stats
#
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesOutPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.net.bytes_out
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.net.bytes_in
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsMessagesInPerSec"'
attribute:
MeanRate:
metric_type: gauge
alias: kafka.messages_in

#
# Request timings
#
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsFailedFetchRequestsPerSec"'
attribute:
MeanRate:
metric_type: gauge
alias: kafka.request.fetch.failed
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsFailedProduceRequestsPerSec"'
attribute:
MeanRate:
metric_type: gauge
alias: kafka.request.produce.failed
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="Produce-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.produce.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.produce.time.99percentile
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="Fetch-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.fetch.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.fetch.time.99percentile
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="UpdateMetadata-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.update_metadata.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.update_metadata.time.99percentile
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="Metadata-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.metadata.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.metadata.time.99percentile
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="Offsets-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.offsets.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.offsets.time.99percentile

#
# Replication stats
#
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="ReplicaManager",name="ISRShrinksPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.isr_shrinks
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="ReplicaManager",name="ISRExpandsPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.isr_expands
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="ControllerStats",name="LeaderElectionRateAndTimeMs"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.leader_elections
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="ControllerStats",name="UncleanLeaderElectionsPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.unclean_leader_elections

#
# Log flush stats
#
- include:
domain: '"kafka.log"'
bean: '"kafka.log":type="LogFlushStats",name="LogFlushRateAndTimeMs"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.log.flush_rate
21 changes: 21 additions & 0 deletions templates/default/kafka_consumer.yaml.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
instances:
<% @instances.each do |i| -%>
- kafka_connect_str: <%= i['kafka_connect_str'] %>
zk_connect_str: <%= i['zk_connect_str'] %>
<% if i['zk_prefix'] -%>
zk_prefix: <%= i['zk_prefix'] %>
<% end -%>
<% if i['consumer_groups'] -%>
consumer_groups: <%= i['name'] %>
<% i["consumer_groups"].each do |consumer, t| -%>
<%= consumer %>:
<% t.each do |topic, l| -%>
<%= topic %>: <%= l %>
<% end -%>
<% end -%>
<% end -%>
<% end -%>

init_config:
# The Kafka Consumer check does not require any init_config

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
@test "kafka.yaml exists" {
[ -f /etc/dd-agent/conf.d/kafka_consumer.yaml ]
}

@test "kafka.yaml is correct" {
export PYTHONPATH=/usr/share/datadog/agent/
script='import yaml, json, sys; print json.dumps(yaml.load(sys.stdin.read()))'
actual=$(cat /etc/dd-agent/conf.d/kafka_consumer.yaml | python -c "$script")

expected='{"instances": [{"zk_connect_str": "localhost:2181", "kafka_connect_str": "localhost:19092", "consumer_groups": {"my_consumer": {"my_topic": [0, 1, 4, 12]}}, "zk_prefix": "/0.8"}], "init_config": null}'

echo "Expected: $expected"
echo "Actual: $actual"
[ "x$actual" == "x$expected" ]
}




19 changes: 19 additions & 0 deletions test/integration/dd-agent-kafka/bats/kafka_config.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
@test "kafka.yaml exists" {
[ -f /etc/dd-agent/conf.d/kafka.yaml ]
}

@test "kafka.yaml is correct" {
export PYTHONPATH=/usr/share/datadog/agent/
script='import yaml, json, sys; print json.dumps(yaml.load(sys.stdin.read()))'
actual=$(cat /etc/dd-agent/conf.d/kafka.yaml | python -c "$script")

expected='{"instances": [{"name": "my_kafka", "trust_store_password": "password", "host": "localhost", "trust_store_path": "/path/to/trustStore.jks", "user": "username", "java_bin_path": "/path/to/java", "password": "password", "port": 9999}], "init_config": {"is_jmx": true, "conf": [{"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.net.bytes_out"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsBytesOutPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.net.bytes_in"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsBytesInPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "gauge", "alias": "kafka.messages_in"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsMessagesInPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "gauge", "alias": "kafka.request.fetch.failed"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsFailedFetchRequestsPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "gauge", "alias": "kafka.request.produce.failed"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsFailedProduceRequestsPerSec\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.produce.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.produce.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"Produce-TotalTimeMs\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.fetch.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.fetch.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"Fetch-TotalTimeMs\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.update_metadata.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.update_metadata.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"UpdateMetadata-TotalTimeMs\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.metadata.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.metadata.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"Metadata-TotalTimeMs\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.offsets.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.offsets.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"Offsets-TotalTimeMs\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.replication.isr_shrinks"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"ReplicaManager\",name=\"ISRShrinksPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.replication.isr_expands"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"ReplicaManager\",name=\"ISRExpandsPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.replication.leader_elections"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"ControllerStats\",name=\"LeaderElectionRateAndTimeMs\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.replication.unclean_leader_elections"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"ControllerStats\",name=\"UncleanLeaderElectionsPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.log.flush_rate"}}, "domain": "\"kafka.log\"", "include": null, "bean": "\"kafka.log\":type=\"LogFlushStats\",name=\"LogFlushRateAndTimeMs\""}]}}'

echo "Expected: $expected"
echo "Actual: $actual"
[ "x$actual" == "x$expected" ]
}