Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trim topic name also #2

Open
wants to merge 4 commits into
base: activemq
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ configuration options.

## Input Plugins

* [activemq](./plugins/inputs/activemq)
* [aerospike](./plugins/inputs/aerospike)
* [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq)
* [apache](./plugins/inputs/apache)
Expand Down
69 changes: 69 additions & 0 deletions plugins/inputs/activemq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Telegraf Input Plugin: ActiveMQ

This plugin gather queues, topics & subscribers metrics using ActiveMQ Console API.

### Configuration:

```toml
# Description
[[inputs.activemq]]
## Required ActiveMQ Endpoint
server = "192.168.50.10"
## Required ActiveMQ port
port = 8161
## Required username used for request HTTP Basic Authentication
username = "admin"
## Required password used for HTTP Basic Authentication
password = "admin"
## Required ActiveMQ webadmin root path
webadmin = "admin"
```

### Measurements & Fields:

Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API.

- queues_metrics:
- size
- consumer_count
- enqueue_count
- dequeue_count
- topics_metrics:
- size
- consumer_count
- enqueue_count
- dequeue_count
- subscribers_metrics:
- pending_queue_size
- dispatched_queue_size
- dispatched_counter
- enqueue_counter
- dequeue_counter

### Tags:

- queues_metrics:
- name
- topics_metrics:
- name
- subscribers_metrics:
- client_id
- subscription_name
- connection_id
- destination_name
- selector
- active

### Example Output:

```
$ ./telegraf -config telegraf.conf -input-filter activemq -test
queues_metrics,name=sandra,host=88284b2fe51b consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000
queues_metrics,name=Test,host=88284b2fe51b dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000
topics_metrics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000
topics_metrics,host=88284b2fe51b,name=AAA\ size=0i,consumer_count=1i,enqueue_count=0i,dequeue_count=0i 1492610703000000000
topics_metrics,name=ActiveMQ.Advisory.Topic\ ,host=88284b2fe51b enqueue_count=1i,dequeue_count=0i,size=0i,consumer_count=0i 1492610703000000000
topics_metrics,name=ActiveMQ.Advisory.Queue\ ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=2i,dequeue_count=0i 1492610703000000000
topics_metrics,name=AAAA\ ,host=88284b2fe51b consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000
subscribers_metrics,connection_id=NOTSET,destination_name=AAA,selector=AA,active=no,host=88284b2fe51b,client_id=AAA,subscription_name=AAA pending_queue_size=0i,dispatched_queue_size=0i,dispatched_counter=0i,enqueue_counter=0i,dequeue_counter=0i 1492610703000000000
```
204 changes: 204 additions & 0 deletions plugins/inputs/activemq/activemq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package activemq

import (
"encoding/xml"
"fmt"
"io/ioutil"
"net/http"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"strings"
)

type ActiveMQ struct {
Server string `json:"server"`
Port int `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
Webadmin string `json:"webadmin"`
}

type Topics struct {
XMLName xml.Name `xml:"topics"`
TopicItems []Topic `xml:"topic"`
}

type Topic struct {
XMLName xml.Name `xml:"topic"`
Name string `xml:"name,attr"`
Stats Stats `xml:"stats"`
}

type Subscribers struct {
XMLName xml.Name `xml:"subscribers"`
SubscriberItems []Subscriber `xml:"subscriber"`
}

type Subscriber struct {
XMLName xml.Name `xml:"subscriber"`
ClientId string `xml:"clientId,attr"`
SubscriptionName string `xml:"subscriptionName,attr"`
ConnectionId string `xml:"connectionId,attr"`
DestinationName string `xml:"destinationName,attr"`
Selector string `xml:"selector,attr"`
Active string `xml:"active,attr"`
Stats Stats `xml:"stats"`
}

type Queues struct {
XMLName xml.Name `xml:"queues"`
QueueItems []Queue `xml:"queue"`
}

type Queue struct {
XMLName xml.Name `xml:"queue"`
Name string `xml:"name,attr"`
Stats Stats `xml:"stats"`
}

type Stats struct {
XMLName xml.Name `xml:"stats"`
Size int `xml:"size,attr"`
ConsumerCount int `xml:"consumerCount,attr"`
EnqueueCount int `xml:"enqueueCount,attr"`
DequeueCount int `xml:"dequeueCount,attr"`
PendingQueueSize int `xml:"pendingQueueSize,attr"`
DispatchedQueueSize int `xml:"dispatchedQueueSize,attr"`
DispatchedCounter int `xml:"dispatchedCounter,attr"`
EnqueueCounter int `xml:"enqueueCounter,attr"`
DequeueCounter int `xml:"dequeueCounter,attr"`
}

const (
QUEUES_STATS = "queues"
TOPICS_STATS = "topics"
SUBSCRIBERS_STATS = "subscribers"
)

var sampleConfig = `
## Required ActiveMQ Endpoint
# server = "192.168.50.10"
## Required ActiveMQ port
# port = 8161
## Required username used for request HTTP Basic Authentication
# username = "admin"
## Required password used for HTTP Basic Authentication
# password = "admin"
## Required ActiveMQ webadmin root path
# webadmin = "admin"
`

func (a *ActiveMQ) Description() string {
return "Gather ActiveMQ metrics"
}

func (a *ActiveMQ) SampleConfig() string {
return sampleConfig
}

func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) {
client := &http.Client{}
url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword)

req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}

req.SetBasicAuth(a.Username, a.Password)
resp, err := client.Do(req)
if err != nil {
return nil, err
}

defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}

func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) {
for _, queue := range queues.QueueItems {
records := make(map[string]interface{})
tags := make(map[string]string)

tags["name"] = strings.TrimSpace(queue.Name)

records["size"] = queue.Stats.Size
records["consumer_count"] = queue.Stats.ConsumerCount
records["enqueue_count"] = queue.Stats.EnqueueCount
records["dequeue_count"] = queue.Stats.DequeueCount

acc.AddFields("queues_metrics", records, tags)
}
}

func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics) {
for _, topic := range topics.TopicItems {
records := make(map[string]interface{})
tags := make(map[string]string)

tags["name"] = strings.TrimSpace(topic.Name)

records["size"] = topic.Stats.Size
records["consumer_count"] = topic.Stats.ConsumerCount
records["enqueue_count"] = topic.Stats.EnqueueCount
records["dequeue_count"] = topic.Stats.DequeueCount

acc.AddFields("topics_metrics", records, tags)
}
}

func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscribers Subscribers) {
for _, subscriber := range subscribers.SubscriberItems {
records := make(map[string]interface{})
tags := make(map[string]string)

tags["client_id"] = subscriber.ClientId
tags["subscription_name"] = subscriber.SubscriptionName
tags["connection_id"] = subscriber.ConnectionId
tags["destination_name"] = subscriber.DestinationName
tags["selector"] = subscriber.Selector
tags["active"] = subscriber.Active

records["pending_queue_size"] = subscriber.Stats.PendingQueueSize
records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize
records["dispatched_counter"] = subscriber.Stats.DispatchedCounter
records["enqueue_counter"] = subscriber.Stats.EnqueueCounter
records["dequeue_counter"] = subscriber.Stats.DequeueCounter

acc.AddFields("subscribers_metrics", records, tags)
}
}

func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error {
dataQueues, err := a.GetMetrics(QUEUES_STATS)
queues := Queues{}
err = xml.Unmarshal(dataQueues, &queues)
if err != nil {
return err
}

dataTopics, err := a.GetMetrics(TOPICS_STATS)
topics := Topics{}
err = xml.Unmarshal(dataTopics, &topics)
if err != nil {
return err
}

dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS)
subscribers := Subscribers{}
err = xml.Unmarshal(dataSubscribers, &subscribers)
if err != nil {
return err
}

a.GatherQueuesMetrics(acc, queues)
a.GatherTopicsMetrics(acc, topics)
a.GatherSubscribersMetrics(acc, subscribers)

return nil
}

func init() {
inputs.Add("activemq", func() telegraf.Input { return &ActiveMQ{} })
}
Loading