Skip to content

Commit

Permalink
Merge pull request #113 from DataDog/kafka-recipe
Browse files Browse the repository at this point in the history
Kafka recipe
  • Loading branch information
Quentin Francois committed Apr 3, 2014
2 parents 66c200b + 2e66f39 commit b0a0f4d
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 0 deletions.
37 changes: 37 additions & 0 deletions .kitchen.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,40 @@ suites:
instances:
- host: localhost
port: 27017

- name: dd-agent-kafka
run_list:
- recipe[datadog::dd-handler]
- recipe[datadog::kafka]
attributes:
datadog:
api_key: somethingnotnil
application_key: alsonotnil
kafka:
instances:
- host: localhost
port: 9999
name: my_kafka
user: username
password: password
java_bin_path: /path/to/java
trust_store_path: /path/to/trustStore.jks
trust_store_password: password

- name: dd-agent-kafka-consumer
run_list:
- recipe[datadog::dd-handler]
- recipe[datadog::kafka_consumer]
attributes:
datadog:
api_key: somethingnotnil
application_key: alsonotnil
kafka_consumer:
instances:
- kafka_connect_str: localhost:19092
zk_connect_str: localhost:2181
zk_prefix: /0.8
consumer_groups:
my_consumer:
my_topic: [0, 1, 4, 12]

25 changes: 25 additions & 0 deletions recipes/kafka.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
include_recipe "datadog::dd-agent"

# Monitor Kafka
#
# Assuming you have 2 clusters "test" and "prod",
# one with and one without authentication
# you need to set up the following attributes
# node.datadog.kafka.instances = [
# {
# :host => "localhost",
# :port => "9999",
# :name => "prod",
# :username => "username",
# :password => "secret"
# },
# {
# :host => "localhost",
# :port => "8199",
# :name => "test"
# }
# ]

datadog_monitor "kafka" do
instances node['datadog']['kafka']['instances']
end
16 changes: 16 additions & 0 deletions recipes/kafka_consumer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include_recipe "datadog::dd-agent"

# Monitor Kafka
#
# You need to set up the following attributes
# node.datadog.kafka_consumer.instances = [
# {
# :kafka_connect_str => "localhost:19092",
# :zk_connect_str => "localhost:2181",
# :zk_prefix => "/0.8"
# }
# ]

datadog_monitor "kafka_consumer" do
instances node['datadog']['kafka_consumer']['instances']
end
164 changes: 164 additions & 0 deletions templates/default/kafka.yaml.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
instances:
<% @instances.each do |i| -%>
- host: <%= i['host'] %>
port: <%= i['port'] %>
<% if i['name'] -%>
name: <%= i['name'] %>
<% end -%>
<% if i['user'] -%>
user: <%= i['user'] %>
<% end -%>
<% if i['password'] -%>
password: <%= i['password'] %>
<% end -%>
<% if i['java_bin_path'] -%>
java_bin_path: <%= i['java_bin_path'] %> #Optional, should be set if the agent cannot find your java executable
<% end -%>
<% if i['trust_store_path'] -%>
trust_store_path: <%= i['trust_store_path'] %> # Optional, should be set if ssl is enabled
<% end -%>
<% if i['trust_store_password'] -%>
trust_store_password: <%= i['trust_store_password'] %>
<% end -%>
<% end -%>

init_config:
is_jmx: true

# Metrics collected by this check. You should not have to modify this.
conf:
#
# Aggregate cluster stats
#
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesOutPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.net.bytes_out
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.net.bytes_in
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsMessagesInPerSec"'
attribute:
MeanRate:
metric_type: gauge
alias: kafka.messages_in

#
# Request timings
#
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsFailedFetchRequestsPerSec"'
attribute:
MeanRate:
metric_type: gauge
alias: kafka.request.fetch.failed
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsFailedProduceRequestsPerSec"'
attribute:
MeanRate:
metric_type: gauge
alias: kafka.request.produce.failed
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="Produce-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.produce.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.produce.time.99percentile
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="Fetch-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.fetch.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.fetch.time.99percentile
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="UpdateMetadata-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.update_metadata.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.update_metadata.time.99percentile
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="Metadata-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.metadata.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.metadata.time.99percentile
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="Offsets-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.offsets.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.offsets.time.99percentile

#
# Replication stats
#
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="ReplicaManager",name="ISRShrinksPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.isr_shrinks
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="ReplicaManager",name="ISRExpandsPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.isr_expands
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="ControllerStats",name="LeaderElectionRateAndTimeMs"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.leader_elections
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="ControllerStats",name="UncleanLeaderElectionsPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.unclean_leader_elections

#
# Log flush stats
#
- include:
domain: '"kafka.log"'
bean: '"kafka.log":type="LogFlushStats",name="LogFlushRateAndTimeMs"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.log.flush_rate
21 changes: 21 additions & 0 deletions templates/default/kafka_consumer.yaml.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
instances:
<% @instances.each do |i| -%>
- kafka_connect_str: <%= i['kafka_connect_str'] %>
zk_connect_str: <%= i['zk_connect_str'] %>
<% if i['zk_prefix'] -%>
zk_prefix: <%= i['zk_prefix'] %>
<% end -%>
<% if i['consumer_groups'] -%>
consumer_groups: <%= i['name'] %>
<% i["consumer_groups"].each do |consumer, t| -%>
<%= consumer %>:
<% t.each do |topic, l| -%>
<%= topic %>: <%= l %>
<% end -%>
<% end -%>
<% end -%>
<% end -%>

init_config:
# The Kafka Consumer check does not require any init_config

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
@test "kafka.yaml exists" {
[ -f /etc/dd-agent/conf.d/kafka_consumer.yaml ]
}

@test "kafka.yaml is correct" {
export PYTHONPATH=/usr/share/datadog/agent/
script='import yaml, json, sys; print json.dumps(yaml.load(sys.stdin.read()))'
actual=$(cat /etc/dd-agent/conf.d/kafka_consumer.yaml | python -c "$script")

expected='{"instances": [{"zk_connect_str": "localhost:2181", "kafka_connect_str": "localhost:19092", "consumer_groups": {"my_consumer": {"my_topic": [0, 1, 4, 12]}}, "zk_prefix": "/0.8"}], "init_config": null}'

echo "Expected: $expected"
echo "Actual: $actual"
[ "x$actual" == "x$expected" ]
}




19 changes: 19 additions & 0 deletions test/integration/dd-agent-kafka/bats/kafka_config.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
@test "kafka.yaml exists" {
[ -f /etc/dd-agent/conf.d/kafka.yaml ]
}

@test "kafka.yaml is correct" {
export PYTHONPATH=/usr/share/datadog/agent/
script='import yaml, json, sys; print json.dumps(yaml.load(sys.stdin.read()))'
actual=$(cat /etc/dd-agent/conf.d/kafka.yaml | python -c "$script")

expected='{"instances": [{"name": "my_kafka", "trust_store_password": "password", "host": "localhost", "trust_store_path": "/path/to/trustStore.jks", "user": "username", "java_bin_path": "/path/to/java", "password": "password", "port": 9999}], "init_config": {"is_jmx": true, "conf": [{"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.net.bytes_out"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsBytesOutPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.net.bytes_in"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsBytesInPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "gauge", "alias": "kafka.messages_in"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsMessagesInPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "gauge", "alias": "kafka.request.fetch.failed"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsFailedFetchRequestsPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "gauge", "alias": "kafka.request.produce.failed"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsFailedProduceRequestsPerSec\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.produce.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.produce.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"Produce-TotalTimeMs\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.fetch.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.fetch.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"Fetch-TotalTimeMs\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.update_metadata.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.update_metadata.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"UpdateMetadata-TotalTimeMs\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.metadata.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.metadata.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"Metadata-TotalTimeMs\""}, {"attribute": {"99thPercentile": {"metric_type": "counter", "alias": "kafka.request.offsets.time.99percentile"}, "Mean": {"metric_type": "counter", "alias": "kafka.request.offsets.time.avg"}}, "domain": "\"kafka.network\"", "include": null, "bean": "\"kafka.network\":type=\"RequestMetrics\",name=\"Offsets-TotalTimeMs\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.replication.isr_shrinks"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"ReplicaManager\",name=\"ISRShrinksPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.replication.isr_expands"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"ReplicaManager\",name=\"ISRExpandsPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.replication.leader_elections"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"ControllerStats\",name=\"LeaderElectionRateAndTimeMs\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.replication.unclean_leader_elections"}}, "domain": "\"kafka.server\"", "include": null, "bean": "\"kafka.server\":type=\"ControllerStats\",name=\"UncleanLeaderElectionsPerSec\""}, {"attribute": {"MeanRate": {"metric_type": "counter", "alias": "kafka.log.flush_rate"}}, "domain": "\"kafka.log\"", "include": null, "bean": "\"kafka.log\":type=\"LogFlushStats\",name=\"LogFlushRateAndTimeMs\""}]}}'

echo "Expected: $expected"
echo "Actual: $actual"
[ "x$actual" == "x$expected" ]
}




0 comments on commit b0a0f4d

Please sign in to comment.