From 71bfaaabd66be00a1e628f0c47654c7f08824649 Mon Sep 17 00:00:00 2001 From: Krassimir Valev Date: Wed, 27 Sep 2017 15:01:24 +0200 Subject: [PATCH] Basic RabbitMQ queues metricset (#4788) Basic RabbitMQ queue metricset Related to #3887 --- CHANGELOG.asciidoc | 2 + metricbeat/Makefile | 2 +- metricbeat/docs/fields.asciidoc | 147 ++++++++++++++++++ metricbeat/docs/modules/rabbitmq.asciidoc | 6 +- .../docs/modules/rabbitmq/queue.asciidoc | 19 +++ metricbeat/include/list.go | 1 + metricbeat/metricbeat.reference.yml | 2 +- metricbeat/module/rabbitmq/_meta/config.yml | 2 +- .../_meta/testdata/queue_sample_response.json | 145 +++++++++++++++++ .../module/rabbitmq/node/_meta/data.json | 134 ++++++++++++++-- .../module/rabbitmq/queue/_meta/data.json | 61 ++++++++ .../module/rabbitmq/queue/_meta/docs.asciidoc | 3 + .../module/rabbitmq/queue/_meta/fields.yml | 75 +++++++++ metricbeat/module/rabbitmq/queue/data.go | 79 ++++++++++ .../rabbitmq/queue/node_integration_test.go | 60 +++++++ metricbeat/module/rabbitmq/queue/node_test.go | 76 +++++++++ metricbeat/module/rabbitmq/queue/queue.go | 55 +++++++ metricbeat/modules.d/rabbitmq.yml.disabled | 2 +- 18 files changed, 851 insertions(+), 20 deletions(-) create mode 100644 metricbeat/docs/modules/rabbitmq/queue.asciidoc create mode 100644 metricbeat/module/rabbitmq/_meta/testdata/queue_sample_response.json create mode 100644 metricbeat/module/rabbitmq/queue/_meta/data.json create mode 100644 metricbeat/module/rabbitmq/queue/_meta/docs.asciidoc create mode 100644 metricbeat/module/rabbitmq/queue/_meta/fields.yml create mode 100644 metricbeat/module/rabbitmq/queue/data.go create mode 100644 metricbeat/module/rabbitmq/queue/node_integration_test.go create mode 100644 metricbeat/module/rabbitmq/queue/node_test.go create mode 100644 metricbeat/module/rabbitmq/queue/queue.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 623a20159e5..0b5ec119f1a 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -100,6 +100,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Add http server metricset to support push metrics via http. {pull}4770[4770] - Make config object public for graphite and http server {pull}4820[4820] - Add system uptime metricset. {issue}[4848[4848] +- Add `filesystem.ignore_types` to system module for ignoring filesystem types. {issue}4685[4685] +- Add experimental `queue` metricset to RabbitMQ module. {pull}4788[4788] *Packetbeat* diff --git a/metricbeat/Makefile b/metricbeat/Makefile index 014408c997a..4447e80519c 100644 --- a/metricbeat/Makefile +++ b/metricbeat/Makefile @@ -27,7 +27,7 @@ kibana: # Collects all module and metricset fields .PHONY: fields -fields: +fields: python-env @mkdir -p _meta @cp ${ES_BEATS}/metricbeat/_meta/fields.common.yml _meta/fields.generated.yml @${PYTHON_ENV}/bin/python ${ES_BEATS}/metricbeat/scripts/fields_collector.py >> _meta/fields.generated.yml diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 8ce16fa9a43..3a8ce9d8277 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -7852,6 +7852,153 @@ type: long Node uptime. +[float] +== queue fields + +queue + + + +[float] +=== `rabbitmq.queue.name` + +type: keyword + +The name of the queue with non-ASCII characters escaped as in C. + + +[float] +=== `rabbitmq.queue.vhost` + +type: keyword + +Virtual host name with non-ASCII characters escaped as in C. + + +[float] +=== `rabbitmq.queue.durable` + +type: boolean + +Whether or not the queue survives server restarts. + + +[float] +=== `rabbitmq.queue.auto_delete` + +type: boolean + +Whether the queue will be deleted automatically when no longer used. + + +[float] +=== `rabbitmq.queue.exclusive` + +type: boolean + +Whether the queue is exclusive (i.e. has owner_pid). + + +[float] +=== `rabbitmq.queue.node` + +type: keyword + +Node name. + + +[float] +=== `rabbitmq.queue.state` + +type: keyword + +The state of the queue. Normally 'running', but may be "{syncing, MsgCount}" if the queue is synchronising. Queues which are located on cluster nodes that are currently down will be shown with a status of 'down'. + + +[float] +=== `rabbitmq.queue.arguments.max_priority` + +type: long + +Maximum number of priority levels for the queue to support. + + +[float] +=== `rabbitmq.queue.consumers.count` + +type: long + +Number of consumers. + + +[float] +=== `rabbitmq.queue.consumers.utilisation.pct` + +type: long + +format: percentage + +Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count. + + +[float] +=== `rabbitmq.queue.messages.total.count` + +type: long + +Sum of ready and unacknowledged messages (queue depth). + + +[float] +=== `rabbitmq.queue.messages.ready.count` + +type: long + +Number of messages ready to be delivered to clients. + + +[float] +=== `rabbitmq.queue.messages.unacknowledged.count` + +type: long + +Number of messages delivered to clients but not yet acknowledged. + + +[float] +=== `rabbitmq.queue.messages.persistent.count` + +type: long + +Total number of persistent messages in the queue (will always be 0 for transient queues). + + +[float] +=== `rabbitmq.queue.memory.bytes` + +type: long + +format: bytes + +Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures. + + +[float] +=== `rabbitmq.queue.disk.reads.count` + +type: long + +Total number of times messages have been read from disk by this queue since it started. + + +[float] +=== `rabbitmq.queue.disk.writes.count` + +type: long + +Total number of times messages have been written to disk by this queue since it started. + + [[exported-fields-redis]] == Redis fields diff --git a/metricbeat/docs/modules/rabbitmq.asciidoc b/metricbeat/docs/modules/rabbitmq.asciidoc index 9a30a23f133..4c07b4150ff 100644 --- a/metricbeat/docs/modules/rabbitmq.asciidoc +++ b/metricbeat/docs/modules/rabbitmq.asciidoc @@ -19,7 +19,7 @@ in <>. Here is an example configuration: ---- metricbeat.modules: - module: rabbitmq - metricsets: ["node"] + metricsets: ["node", "queue"] period: 10s hosts: ["localhost:15672"] @@ -34,5 +34,9 @@ The following metricsets are available: * <> +* <> + include::rabbitmq/node.asciidoc[] +include::rabbitmq/queue.asciidoc[] + diff --git a/metricbeat/docs/modules/rabbitmq/queue.asciidoc b/metricbeat/docs/modules/rabbitmq/queue.asciidoc new file mode 100644 index 00000000000..acb8ea89dd6 --- /dev/null +++ b/metricbeat/docs/modules/rabbitmq/queue.asciidoc @@ -0,0 +1,19 @@ +//// +This file is generated! See scripts/docs_collector.py +//// + +[[metricbeat-metricset-rabbitmq-queue]] +include::../../../module/rabbitmq/queue/_meta/docs.asciidoc[] + + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/rabbitmq/queue/_meta/data.json[] +---- diff --git a/metricbeat/include/list.go b/metricbeat/include/list.go index dc9cff1a8a5..b06d4359baf 100644 --- a/metricbeat/include/list.go +++ b/metricbeat/include/list.go @@ -87,6 +87,7 @@ import ( _ "github.com/elastic/beats/metricbeat/module/prometheus/stats" _ "github.com/elastic/beats/metricbeat/module/rabbitmq" _ "github.com/elastic/beats/metricbeat/module/rabbitmq/node" + _ "github.com/elastic/beats/metricbeat/module/rabbitmq/queue" _ "github.com/elastic/beats/metricbeat/module/redis" _ "github.com/elastic/beats/metricbeat/module/redis/info" _ "github.com/elastic/beats/metricbeat/module/redis/keyspace" diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 5dba70d55d8..081934a61a1 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -378,7 +378,7 @@ metricbeat.modules: #------------------------------ RabbitMQ Module ------------------------------ - module: rabbitmq - metricsets: ["node"] + metricsets: ["node", "queue"] period: 10s hosts: ["localhost:15672"] diff --git a/metricbeat/module/rabbitmq/_meta/config.yml b/metricbeat/module/rabbitmq/_meta/config.yml index f45427f547b..c0343876425 100644 --- a/metricbeat/module/rabbitmq/_meta/config.yml +++ b/metricbeat/module/rabbitmq/_meta/config.yml @@ -1,5 +1,5 @@ - module: rabbitmq - metricsets: ["node"] + metricsets: ["node", "queue"] period: 10s hosts: ["localhost:15672"] diff --git a/metricbeat/module/rabbitmq/_meta/testdata/queue_sample_response.json b/metricbeat/module/rabbitmq/_meta/testdata/queue_sample_response.json new file mode 100644 index 00000000000..15eb21fe863 --- /dev/null +++ b/metricbeat/module/rabbitmq/_meta/testdata/queue_sample_response.json @@ -0,0 +1,145 @@ +[ + { + "memory": 232720, + "message_stats": { + "disk_reads": 212, + "disk_reads_details": { + "rate": 0 + }, + "disk_writes": 121, + "disk_writes_details": { + "rate": 0 + }, + "deliver": 15, + "deliver_details": { + "rate": 0 + }, + "deliver_no_ack": 0, + "deliver_no_ack_details": { + "rate": 0 + }, + "get": 0, + "get_details": { + "rate": 0 + }, + "get_no_ack": 38, + "get_no_ack_details": { + "rate": 0 + }, + "publish": 121, + "publish_details": { + "rate": 0 + }, + "publish_in": 0, + "publish_in_details": { + "rate": 0 + }, + "publish_out": 0, + "publish_out_details": { + "rate": 0 + }, + "ack": 9, + "ack_details": { + "rate": 0 + }, + "deliver_get": 53, + "deliver_get_details": { + "rate": 0 + }, + "confirm": 0, + "confirm_details": { + "rate": 0 + }, + "return_unroutable": 0, + "return_unroutable_details": { + "rate": 0 + }, + "redeliver": 3, + "redeliver_details": { + "rate": 0 + } + }, + "reductions": 787128, + "reductions_details": { + "rate": 0 + }, + "messages": 74, + "messages_details": { + "rate": 0 + }, + "messages_ready": 71, + "messages_ready_details": { + "rate": 0 + }, + "messages_unacknowledged": 3, + "messages_unacknowledged_details": { + "rate": 0 + }, + "idle_since": "2017-07-28 23:45:52", + "consumer_utilisation": 0.7, + "policy": null, + "exclusive_consumer_tag": null, + "consumers": 3, + "recoverable_slaves": null, + "state": "running", + "garbage_collection": { + "min_bin_vheap_size": 46422, + "min_heap_size": 233, + "fullsweep_after": 65535, + "minor_gcs": 0 + }, + "messages_ram": 74, + "messages_ready_ram": 71, + "messages_unacknowledged_ram": 3, + "messages_persistent": 73, + "message_bytes": 101824, + "message_bytes_ready": 97696, + "message_bytes_unacknowledged": 4128, + "message_bytes_ram": 101824, + "message_bytes_persistent": 101824, + "head_message_timestamp": 1501250275, + "disk_reads": 212, + "disk_writes": 121, + "backing_queue_status": { + "priority_lengths": { + "0": 0, + "1": 71, + "2": 0, + "3": 0, + "4": 0, + "5": 0, + "6": 0, + "7": 0, + "8": 0, + "9": 0 + }, + "mode": "default", + "q1": 0, + "q2": 0, + "delta": [ + "delta", + "todo", + "todo", + "todo" + ], + "q3": 0, + "q4": 71, + "len": 71, + "target_ram_count": "infinity", + "next_seq_id": 121, + "avg_ingress_rate": 0, + "avg_egress_rate": 0.00019793395296866087, + "avg_ack_ingress_rate": 0.00019793395296866087, + "avg_ack_egress_rate": 0.00019793395296866087 + }, + "node": "rabbit@localhost", + "arguments": { + "x-max-priority": 9 + }, + "exclusive": false, + "auto_delete": false, + "durable": true, + "vhost": "/", + "name": "queuenamehere" + } +] \ No newline at end of file diff --git a/metricbeat/module/rabbitmq/node/_meta/data.json b/metricbeat/module/rabbitmq/node/_meta/data.json index 24702fec3a2..afd97479cf6 100644 --- a/metricbeat/module/rabbitmq/node/_meta/data.json +++ b/metricbeat/module/rabbitmq/node/_meta/data.json @@ -1,19 +1,123 @@ { - "@timestamp":"2016-05-23T08:05:34.853Z", - "beat":{ - "hostname":"beathost", - "name":"beathost" + "@timestamp": "2017-08-21T21:06:45.009Z", + "@metadata": { + "beat": "metricbeat", + "type": "doc" }, - "metricset":{ - "host":"localhost", - "module":"rabbitmq", - "name":"node", - "rtt":44269 - }, - "rabbitmq":{ - "node":{ - "example": "node" + "rabbitmq": { + "node": { + "queue": { + "index": { + "journal_write": { + "count": 676 + }, + "read": {}, + "write": { + "count": 92 + } + } + }, + "processors": 8, + "msg": { + "store_read": {}, + "store_write": {} + }, + "run": { + "queue": 0 + }, + "type": "disc", + "disk": { + "free": { + "bytes": 17315815424, + "limit": { + "bytes": 50000000 + } + } + }, + "mnesia": { + "ram": { + "tx": { + "count": 87 + } + }, + "disk": { + "tx": { + "count": 740 + } + } + }, + "socket": { + "used": 0, + "total": 138 + }, + "uptime": 888613658, + "io": { + "file_handle": { + "open_attempt": { + "avg": {} + } + }, + "read": { + "count": 1, + "avg": { + "ms": 0 + }, + "bytes": 1 + }, + "reopen": { + "count": 1 + }, + "seek": { + "count": 450, + "avg": { + "ms": 7 + } + }, + "sync": { + "count": 531, + "avg": { + "ms": 0 + } + }, + "write": { + "avg": { + "ms": 0 + }, + "bytes": 346498, + "count": 531 + } + }, + "fd": { + "total": 256 + }, + "gc": { + "reclaimed": {}, + "num": {} + }, + "name": "rabbit@localhost", + "proc": { + "total": 1048576, + "used": 252 + }, + "mem": { + "limit": { + "bytes": 6309860147 + }, + "used": { + "bytes": 62095872 + } } + } + }, + "metricset": { + "module": "rabbitmq", + "name": "node", + "host": "localhost:15672", + "rtt": 30110 }, - "type":"metricsets" -} + "beat": { + "name": "name", + "hostname": "hostname", + "version": "7.0.0-alpha1" + } + } \ No newline at end of file diff --git a/metricbeat/module/rabbitmq/queue/_meta/data.json b/metricbeat/module/rabbitmq/queue/_meta/data.json new file mode 100644 index 00000000000..ca357e34c24 --- /dev/null +++ b/metricbeat/module/rabbitmq/queue/_meta/data.json @@ -0,0 +1,61 @@ +{ + "@timestamp": "2017-08-21T21:06:47.437Z", + "@metadata": { + "beat": "metricbeat", + "type": "doc" + }, + "rabbitmq": { + "queue": { + "disk": { + "writes": { + "count": 0 + }, + "reads": { + "count": 0 + } + }, + "consumers": { + "count": 5, + "utilisation": {} + }, + "auto_delete": false, + "memory": { + "bytes": 144392 + }, + "state": "running", + "messages": { + "total": { + "count": 0 + }, + "ready": { + "count": 0 + }, + "unacknowledged": { + "count": 0 + }, + "persistent": { + "count": 0 + } + }, + "durable": true, + "vhost": "/", + "arguments": { + "max_priority": 9 + }, + "exclusive": false, + "node": "rabbit@localhost", + "name": "myfancyqueue" + } + }, + "metricset": { + "module": "rabbitmq", + "name": "queue", + "host": "localhost:15672", + "rtt": 6730 + }, + "beat": { + "version": "7.0.0-alpha1", + "name": "name", + "hostname": "hostname" + } + } \ No newline at end of file diff --git a/metricbeat/module/rabbitmq/queue/_meta/docs.asciidoc b/metricbeat/module/rabbitmq/queue/_meta/docs.asciidoc new file mode 100644 index 00000000000..ef655648f98 --- /dev/null +++ b/metricbeat/module/rabbitmq/queue/_meta/docs.asciidoc @@ -0,0 +1,3 @@ +=== rabbitmq queue MetricSet + +This is the queue metricset of the module rabbitmq. diff --git a/metricbeat/module/rabbitmq/queue/_meta/fields.yml b/metricbeat/module/rabbitmq/queue/_meta/fields.yml new file mode 100644 index 00000000000..0fbe15dbab9 --- /dev/null +++ b/metricbeat/module/rabbitmq/queue/_meta/fields.yml @@ -0,0 +1,75 @@ +- name: queue + type: group + description: > + queue + fields: + - name: name + type: keyword + description: > + The name of the queue with non-ASCII characters escaped as in C. + - name: vhost + type: keyword + description: > + Virtual host name with non-ASCII characters escaped as in C. + - name: durable + type: boolean + description: > + Whether or not the queue survives server restarts. + - name: auto_delete + type: boolean + description: > + Whether the queue will be deleted automatically when no longer used. + - name: exclusive + type: boolean + description: > + Whether the queue is exclusive (i.e. has owner_pid). + - name: node + type: keyword + description: > + Node name. + - name: state + type: keyword + description: > + The state of the queue. Normally 'running', but may be "{syncing, MsgCount}" if the queue is synchronising. Queues which are located on cluster nodes that are currently down will be shown with a status of 'down'. + - name: arguments.max_priority + type: long + description: > + Maximum number of priority levels for the queue to support. + - name: consumers.count + type: long + description: > + Number of consumers. + - name: consumers.utilisation.pct + type: long + format: percentage + description: > + Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count. + - name: messages.total.count + type: long + description: > + Sum of ready and unacknowledged messages (queue depth). + - name: messages.ready.count + type: long + description: > + Number of messages ready to be delivered to clients. + - name: messages.unacknowledged.count + type: long + description: > + Number of messages delivered to clients but not yet acknowledged. + - name: messages.persistent.count + type: long + description: > + Total number of persistent messages in the queue (will always be 0 for transient queues). + - name: memory.bytes + type: long + format: bytes + description: > + Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures. + - name: disk.reads.count + type: long + description: > + Total number of times messages have been read from disk by this queue since it started. + - name: disk.writes.count + type: long + description: > + Total number of times messages have been written to disk by this queue since it started. diff --git a/metricbeat/module/rabbitmq/queue/data.go b/metricbeat/module/rabbitmq/queue/data.go new file mode 100644 index 00000000000..6c7a04b7f74 --- /dev/null +++ b/metricbeat/module/rabbitmq/queue/data.go @@ -0,0 +1,79 @@ +package queue + +import ( + "encoding/json" + + "github.com/elastic/beats/libbeat/common" + s "github.com/elastic/beats/libbeat/common/schema" + c "github.com/elastic/beats/libbeat/common/schema/mapstriface" + "github.com/elastic/beats/libbeat/logp" +) + +var ( + schema = s.Schema{ + "name": c.Str("name"), + "vhost": c.Str("vhost"), + "durable": c.Bool("durable"), + "auto_delete": c.Bool("auto_delete"), + "exclusive": c.Bool("exclusive"), + "node": c.Str("node"), + "state": c.Str("state"), + "arguments": c.Dict("arguments", s.Schema{ + "max_priority": c.Int("x-max-priority", s.Optional), + }), + "consumers": s.Object{ + "count": c.Int("consumers"), + "utilisation": s.Object{ + "pct": c.Int("consumer_utilisation", s.Optional), + }, + }, + "messages": s.Object{ + "total": s.Object{ + "count": c.Int("messages"), + }, + "ready": s.Object{ + "count": c.Int("messages_ready"), + }, + "unacknowledged": s.Object{ + "count": c.Int("messages_unacknowledged"), + }, + "persistent": s.Object{ + "count": c.Int("messages_persistent"), + }, + }, + "memory": s.Object{ + "bytes": c.Int("memory"), + }, + "disk": s.Object{ + "reads": s.Object{ + "count": c.Int("disk_reads"), + }, + "writes": s.Object{ + "count": c.Int("disk_writes"), + }, + }, + } +) + +func eventsMapping(content []byte) ([]common.MapStr, error) { + var queues []map[string]interface{} + err := json.Unmarshal(content, &queues) + if err != nil { + logp.Err("Error: ", err) + } + + events := []common.MapStr{} + errors := s.NewErrors() + + for _, queue := range queues { + event, errs := eventMapping(queue) + events = append(events, event) + errors.AddErrors(errs) + } + + return events, errors +} + +func eventMapping(queue map[string]interface{}) (common.MapStr, *s.Errors) { + return schema.Apply(queue) +} diff --git a/metricbeat/module/rabbitmq/queue/node_integration_test.go b/metricbeat/module/rabbitmq/queue/node_integration_test.go new file mode 100644 index 00000000000..7801aa7524e --- /dev/null +++ b/metricbeat/module/rabbitmq/queue/node_integration_test.go @@ -0,0 +1,60 @@ +package queue + +import ( + "fmt" + "os" + "testing" + + mbtest "github.com/elastic/beats/metricbeat/mb/testing" +) + +func TestData(t *testing.T) { + f := mbtest.NewEventsFetcher(t, getConfig()) + err := mbtest.WriteEvents(f, t) + if err != nil { + t.Fatal("write", err) + } +} + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "rabbitmq", + "metricsets": []string{"queue"}, + "hosts": getTestRabbitMQHost(), + "username": getTestRabbitMQUsername(), + "password": getTestRabbitMQPassword(), + } +} + +const ( + rabbitmqDefaultHost = "localhost" + rabbitmqDefaultPort = "15672" + rabbitmqDefaultUsername = "guest" + rabbitmqDefaultPassword = "guest" +) + +func getTestRabbitMQHost() string { + return fmt.Sprintf("%v:%v", + getenv("RABBITMQ_HOST", rabbitmqDefaultHost), + getenv("RABBITMQ_PORT", rabbitmqDefaultPort), + ) +} + +func getTestRabbitMQUsername() string { + return getenv("RABBITMQ_USERNAME", rabbitmqDefaultUsername) +} + +func getTestRabbitMQPassword() string { + return getenv("RABBITMQ_PASSWORD", rabbitmqDefaultPassword) +} + +func getenv(name, defaultValue string) string { + return strDefault(os.Getenv(name), defaultValue) +} + +func strDefault(a, defaults string) string { + if len(a) == 0 { + return defaults + } + return a +} diff --git a/metricbeat/module/rabbitmq/queue/node_test.go b/metricbeat/module/rabbitmq/queue/node_test.go new file mode 100644 index 00000000000..24cfbea8f1f --- /dev/null +++ b/metricbeat/module/rabbitmq/queue/node_test.go @@ -0,0 +1,76 @@ +package queue + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + + "github.com/elastic/beats/libbeat/common" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + + "github.com/stretchr/testify/assert" +) + +func TestFetchEventContents(t *testing.T) { + absPath, err := filepath.Abs("../_meta/testdata/") + + response, err := ioutil.ReadFile(absPath + "/queue_sample_response.json") + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + })) + defer server.Close() + + config := map[string]interface{}{ + "module": "rabbitmq", + "metricsets": []string{"queue"}, + "hosts": []string{server.URL}, + } + + f := mbtest.NewEventsFetcher(t, config) + events, err := f.Fetch() + event := events[0] + if !assert.NoError(t, err) { + t.FailNow() + } + + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event.StringToPrint()) + + assert.EqualValues(t, "queuenamehere", event["name"]) + assert.EqualValues(t, "/", event["vhost"]) + assert.EqualValues(t, true, event["durable"]) + assert.EqualValues(t, false, event["auto_delete"]) + assert.EqualValues(t, false, event["exclusive"]) + assert.EqualValues(t, "running", event["state"]) + assert.EqualValues(t, "rabbit@localhost", event["node"]) + + arguments := event["arguments"].(common.MapStr) + assert.EqualValues(t, 9, arguments["max_priority"]) + + consumers := event["consumers"].(common.MapStr) + utilisation := consumers["utilisation"].(common.MapStr) + assert.EqualValues(t, 3, consumers["count"]) + assert.EqualValues(t, 0.7, utilisation["pct"]) + + memory := event["memory"].(common.MapStr) + assert.EqualValues(t, 232720, memory["bytes"]) + + messages := event["messages"].(common.MapStr) + total := messages["total"].(common.MapStr) + ready := messages["ready"].(common.MapStr) + unacknowledged := messages["unacknowledged"].(common.MapStr) + persistent := messages["persistent"].(common.MapStr) + assert.EqualValues(t, 74, total["count"]) + assert.EqualValues(t, 71, ready["count"]) + assert.EqualValues(t, 3, unacknowledged["count"]) + assert.EqualValues(t, 73, persistent["count"]) + + disk := event["disk"].(common.MapStr) + reads := disk["reads"].(common.MapStr) + writes := disk["writes"].(common.MapStr) + assert.EqualValues(t, 212, reads["count"]) + assert.EqualValues(t, 121, writes["count"]) +} diff --git a/metricbeat/module/rabbitmq/queue/queue.go b/metricbeat/module/rabbitmq/queue/queue.go new file mode 100644 index 00000000000..688361fe713 --- /dev/null +++ b/metricbeat/module/rabbitmq/queue/queue.go @@ -0,0 +1,55 @@ +package queue + +import ( + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/metricbeat/helper" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/mb/parse" +) + +const ( + defaultScheme = "http" + defaultPath = "/api/queues" +) + +var ( + hostParser = parse.URLHostParserBuilder{ + DefaultScheme: defaultScheme, + DefaultPath: defaultPath, + }.Build() +) + +func init() { + if err := mb.Registry.AddMetricSet("rabbitmq", "queue", New, hostParser); err != nil { + panic(err) + } +} + +type MetricSet struct { + mb.BaseMetricSet + *helper.HTTP +} + +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + cfgwarn.Experimental("The rabbitmq queue metricset is experimental") + + http := helper.NewHTTP(base) + http.SetHeader("Accept", "application/json") + + return &MetricSet{ + base, + http, + }, nil +} + +func (m *MetricSet) Fetch() ([]common.MapStr, error) { + content, err := m.HTTP.FetchContent() + + if err != nil { + return nil, err + } + + events, _ := eventsMapping(content) + return events, nil +} diff --git a/metricbeat/modules.d/rabbitmq.yml.disabled b/metricbeat/modules.d/rabbitmq.yml.disabled index f45427f547b..c0343876425 100644 --- a/metricbeat/modules.d/rabbitmq.yml.disabled +++ b/metricbeat/modules.d/rabbitmq.yml.disabled @@ -1,5 +1,5 @@ - module: rabbitmq - metricsets: ["node"] + metricsets: ["node", "queue"] period: 10s hosts: ["localhost:15672"]