diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f0b1d017e00..894d3c37827 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Update logstash.node_stats metricset to write data under `logstash.node.stats.*`. {pull}6714[6714] - Fixed typo in values for `state_container` `status.phase`, from `terminate` to `terminated`. {pull}6916[6916] - RabbitMQ management plugin path is now configured at the module level instead of having to do it in each of the metricsets. New `management_path_prefix` option should be used now {pull}7074[7074] +- RabbitMQ node metricset only collects metrics of the instance it connects to, `node.collect: cluster` can be used to collect all nodes as before. {issue}6556[6556] {pull}6971[6971] *Packetbeat* diff --git a/metricbeat/module/rabbitmq/_meta/testdata/node_sample_response.json b/metricbeat/module/rabbitmq/_meta/testdata/node_sample_response.json index dd3f73eb6b3..c7c7c1fd60f 100644 --- a/metricbeat/module/rabbitmq/_meta/testdata/node_sample_response.json +++ b/metricbeat/module/rabbitmq/_meta/testdata/node_sample_response.json @@ -1,312 +1,337 @@ -[ - { - "applications": [ - { - "description": "RabbitMQ AMQP Client", - "name": "amqp_client", - "version": "3.6.9" - }, - { - "description": "The Erlang ASN1 compiler version 4.0.4", - "name": "asn1", - "version": "4.0.4" - }, - { - "description": "ERTS CXC 138 10", - "name": "compiler", - "version": "7.0.4" - }, - { - "description": "Small, fast, modular HTTP server.", - "name": "cowboy", - "version": "1.0.4" - }, - { - "description": "Support library for manipulating Web protocols.", - "name": "cowlib", - "version": "1.0.2" - }, - { - "description": "CRYPTO", - "name": "crypto", - "version": "3.7.3" - }, - { - "description": "INETS CXC 138 49", - "name": "inets", - "version": "6.3.6" - }, - { - "description": "ERTS CXC 138 10", - "name": "kernel", - "version": "5.2" - }, - { - "description": "MNESIA CXC 138 12", - "name": "mnesia", - "version": "4.14.3" - }, - { - "description": "CPO CXC 138 46", - "name": "os_mon", - "version": "2.4.2" - }, - { - "description": "Public key infrastructure", - "name": "public_key", - "version": "1.4" - }, - { - "description": "RabbitMQ", - "name": "rabbit", - "version": "3.6.9" - }, - { - "description": "Modules shared by rabbitmq-server and rabbitmq-erlang-client", - "name": "rabbit_common", - "version": "3.6.9" - }, - { - "description": "RabbitMQ Management Console", - "name": "rabbitmq_management", - "version": "3.6.9" - }, - { - "description": "RabbitMQ Management Agent", - "name": "rabbitmq_management_agent", - "version": "3.6.9" - }, - { - "description": "RabbitMQ Web Dispatcher", - "name": "rabbitmq_web_dispatch", - "version": "3.6.9" - }, - { - "description": "Socket acceptor pool for TCP protocols.", - "name": "ranch", - "version": "1.3.0" - }, - { - "description": "SASL CXC 138 11", - "name": "sasl", - "version": "3.0.3" - }, - { - "description": "Erlang/OTP SSL application", - "name": "ssl", - "version": "8.1.1" - }, - { - "description": "ERTS CXC 138 10", - "name": "stdlib", - "version": "3.3" - }, - { - "description": "Syntax tools", - "name": "syntax_tools", - "version": "2.1.1" - }, - { - "description": "XML parser", - "name": "xmerl", - "version": "1.3.13" - } - ], - "auth_mechanisms": [ - { - "description": "SASL PLAIN authentication mechanism", - "enabled": true, - "name": "PLAIN" - }, - { - "description": "RabbitMQ Demo challenge-response authentication mechanism", - "enabled": false, - "name": "RABBIT-CR-DEMO" - }, - { - "description": "QPid AMQPLAIN mechanism", - "enabled": true, - "name": "AMQPLAIN" - } - ], - "cluster_links": [], - "config_files": [ - "/etc/rabbitmq/rabbitmq.config" - ], - "context_switches": 75445, - "context_switches_details": { - "rate": 37.8 - }, - "contexts": [ - { - "description": "RabbitMQ Management", - "path": "/", - "port": "15672", - "ssl": "false" - } - ], - "db_dir": "/var/lib/rabbitmq/mnesia/rabbit@prcdsrvv1682", - "disk_free": 313352192, - "disk_free_alarm": false, - "disk_free_details": { - "rate": 929792.0 - }, - "disk_free_limit": 50000000, - "enabled_plugins": [ - "rabbitmq_management" - ], - "exchange_types": [ - { - "description": "AMQP direct exchange, as per the AMQP specification", - "enabled": true, - "name": "direct" - }, - { - "description": "AMQP topic exchange, as per the AMQP specification", - "enabled": true, - "name": "topic" - }, - { - "description": "AMQP fanout exchange, as per the AMQP specification", - "enabled": true, - "name": "fanout" - }, - { - "description": "AMQP headers exchange, as per the AMQP specification", - "enabled": true, - "name": "headers" - } - ], - "fd_total": 65536, - "fd_used": 54, - "fd_used_details": { - "rate": 0.0 - }, - "gc_bytes_reclaimed": 270119840, - "gc_bytes_reclaimed_details": { - "rate": 85476.8 - }, - "gc_num": 3184, - "gc_num_details": { - "rate": 9.0 - }, - "io_file_handle_open_attempt_avg_time": 0.1362, - "io_file_handle_open_attempt_avg_time_details": { - "rate": 0.0 - }, - "io_file_handle_open_attempt_count": 10, - "io_file_handle_open_attempt_count_details": { - "rate": 0.0 - }, - "io_read_avg_time": 33.969, - "io_read_avg_time_details": { - "rate": 0.0 - }, - "io_read_bytes": 1, - "io_read_bytes_details": { - "rate": 0.0 - }, - "io_read_count": 1, - "io_read_count_details": { - "rate": 0.0 - }, - "io_reopen_count": 0, - "io_reopen_count_details": { - "rate": 0.0 - }, - "io_seek_avg_time": 0.0, - "io_seek_avg_time_details": { - "rate": 0.0 - }, - "io_seek_count": 0, - "io_seek_count_details": { - "rate": 0.0 - }, - "io_sync_avg_time": 0.0, - "io_sync_avg_time_details": { - "rate": 0.0 - }, - "io_sync_count": 0, - "io_sync_count_details": { - "rate": 0.0 - }, - "io_write_avg_time": 0.0, - "io_write_avg_time_details": { - "rate": 0.0 - }, - "io_write_bytes": 0, - "io_write_bytes_details": { - "rate": 0.0 - }, - "io_write_count": 0, - "io_write_count_details": { - "rate": 0.0 - }, - "log_file": "tty", - "mem_alarm": false, - "mem_limit": 413047193, - "mem_used": 57260080, - "mem_used_details": { - "rate": -1910.4 - }, - "metrics_gc_queue_length": { - "channel_closed": 0, - "channel_consumer_deleted": 0, - "connection_closed": 0, - "consumer_deleted": 0, - "exchange_deleted": 0, - "node_node_deleted": 0, - "queue_deleted": 0, - "vhost_deleted": 0 - }, - "mnesia_disk_tx_count": 0, - "mnesia_disk_tx_count_details": { - "rate": 0.0 - }, - "mnesia_ram_tx_count": 11, - "mnesia_ram_tx_count_details": { - "rate": 0.0 - }, - "msg_store_read_count": 0, - "msg_store_read_count_details": { - "rate": 0.0 - }, - "msg_store_write_count": 0, - "msg_store_write_count_details": { - "rate": 0.0 - }, - "name": "rabbit@prcdsrvv1682", - "net_ticktime": 60, - "os_pid": "114", - "partitions": [], - "proc_total": 1048576, - "proc_used": 322, - "proc_used_details": { - "rate": 0.0 - }, - "processors": 2, - "queue_index_journal_write_count": 0, - "queue_index_journal_write_count_details": { - "rate": 0.0 - }, - "queue_index_read_count": 0, - "queue_index_read_count_details": { - "rate": 0.0 - }, - "queue_index_write_count": 0, - "queue_index_write_count_details": { - "rate": 0.0 - }, - "rates_mode": "basic", - "run_queue": 0, - "running": true, - "sasl_log_file": "tty", - "sockets_total": 58890, - "sockets_used": 0, - "sockets_used_details": { - "rate": 0.0 - }, - "type": "disc", - "uptime": 37139 +{ + "partitions": [], + "os_pid": "133", + "fd_total": 1048576, + "sockets_total": 943626, + "mem_limit": 6628692787, + "mem_alarm": false, + "disk_free_limit": 50000000, + "disk_free_alarm": false, + "proc_total": 1048576, + "rates_mode": "basic", + "uptime": 98754834, + "run_queue": 0, + "processors": 4, + "exchange_types": [ + { + "name": "fanout", + "description": "AMQP fanout exchange, as per the AMQP specification", + "enabled": true + }, + { + "name": "topic", + "description": "AMQP topic exchange, as per the AMQP specification", + "enabled": true + }, + { + "name": "direct", + "description": "AMQP direct exchange, as per the AMQP specification", + "enabled": true + }, + { + "name": "headers", + "description": "AMQP headers exchange, as per the AMQP specification", + "enabled": true + } + ], + "auth_mechanisms": [ + { + "name": "AMQPLAIN", + "description": "QPid AMQPLAIN mechanism", + "enabled": true + }, + { + "name": "PLAIN", + "description": "SASL PLAIN authentication mechanism", + "enabled": true + }, + { + "name": "RABBIT-CR-DEMO", + "description": "RabbitMQ Demo challenge-response authentication mechanism", + "enabled": false + } + ], + "applications": [ + { + "name": "amqp_client", + "description": "RabbitMQ AMQP Client", + "version": "3.7.4" + }, + { + "name": "asn1", + "description": "The Erlang ASN1 compiler version 5.0.4", + "version": "5.0.4" + }, + { + "name": "compiler", + "description": "ERTS CXC 138 10", + "version": "7.1.4" + }, + { + "name": "cowboy", + "description": "Small, fast, modern HTTP server.", + "version": "2.2.2" + }, + { + "name": "cowlib", + "description": "Support library for manipulating Web protocols.", + "version": "2.1.0" + }, + { + "name": "crypto", + "description": "CRYPTO", + "version": "4.2" + }, + { + "name": "goldrush", + "description": "Erlang event stream processor", + "version": "0.1.9" + }, + { + "name": "inets", + "description": "INETS CXC 138 49", + "version": "6.4.5" + }, + { + "name": "jsx", + "description": "a streaming, evented json parsing toolkit", + "version": "2.8.2" + }, + { + "name": "kernel", + "description": "ERTS CXC 138 10", + "version": "5.4.2" + }, + { + "name": "lager", + "description": "Erlang logging framework", + "version": "3.5.1" + }, + { + "name": "mnesia", + "description": "MNESIA CXC 138 12", + "version": "4.15.3" + }, + { + "name": "os_mon", + "description": "CPO CXC 138 46", + "version": "2.4.4" + }, + { + "name": "public_key", + "description": "Public key infrastructure", + "version": "1.5.2" + }, + { + "name": "rabbit", + "description": "RabbitMQ", + "version": "3.7.4" + }, + { + "name": "rabbit_common", + "description": "Modules shared by rabbitmq-server and rabbitmq-erlang-client", + "version": "3.7.4" + }, + { + "name": "rabbitmq_management", + "description": "RabbitMQ Management Console", + "version": "3.7.4" + }, + { + "name": "rabbitmq_management_agent", + "description": "RabbitMQ Management Agent", + "version": "3.7.4" + }, + { + "name": "rabbitmq_web_dispatch", + "description": "RabbitMQ Web Dispatcher", + "version": "3.7.4" + }, + { + "name": "ranch", + "description": "Socket acceptor pool for TCP protocols.", + "version": "1.4.0" + }, + { + "name": "ranch_proxy_protocol", + "description": "Ranch Proxy Protocol Transport", + "version": "1.4.4" + }, + { + "name": "recon", + "description": "Diagnostic tools for production use", + "version": "2.3.2" + }, + { + "name": "sasl", + "description": "SASL CXC 138 11", + "version": "3.1.1" + }, + { + "name": "ssl", + "description": "Erlang/OTP SSL application", + "version": "8.2.3" + }, + { + "name": "stdlib", + "description": "ERTS CXC 138 10", + "version": "3.4.3" + }, + { + "name": "syntax_tools", + "description": "Syntax tools", + "version": "2.1.4" + }, + { + "name": "xmerl", + "description": "XML parser", + "version": "1.3.16" + } + ], + "contexts": [ + { + "description": "RabbitMQ Management", + "path": "/", + "ssl": "false", + "port": "15672" } -] + ], + "log_files": [ + "" + ], + "db_dir": "/var/lib/rabbitmq/mnesia/rabbit@e2b1ae6390fd", + "config_files": [ + "/etc/rabbitmq/rabbitmq.conf" + ], + "net_ticktime": 60, + "enabled_plugins": [ + "rabbitmq_management" + ], + "mem_calculation_strategy": "rss", + "name": "rabbit@e2b1ae6390fd", + "type": "disc", + "running": true, + "mem_used": 105504768, + "mem_used_details": { + "rate": -3276.8 + }, + "fd_used": 31, + "fd_used_details": { + "rate": 0 + }, + "sockets_used": 3, + "sockets_used_details": { + "rate": 0 + }, + "proc_used": 403, + "proc_used_details": { + "rate": 0 + }, + "disk_free": 98317942784, + "disk_free_details": { + "rate": -6553.6 + }, + "gc_num": 1049055, + "gc_num_details": { + "rate": 20 + }, + "gc_bytes_reclaimed": 27352751800, + "gc_bytes_reclaimed_details": { + "rate": 529257.6 + }, + "context_switches": 5377028, + "context_switches_details": { + "rate": 114.6 + }, + "io_read_count": 3, + "io_read_count_details": { + "rate": 0 + }, + "io_read_bytes": 1, + "io_read_bytes_details": { + "rate": 0 + }, + "io_read_avg_time": 0.063, + "io_read_avg_time_details": { + "rate": 0 + }, + "io_write_count": 149402, + "io_write_count_details": { + "rate": 4 + }, + "io_write_bytes": 36305460, + "io_write_bytes_details": { + "rate": 972 + }, + "io_write_avg_time": 0.23600591692212955, + "io_write_avg_time_details": { + "rate": 0.2701 + }, + "io_sync_count": 149402, + "io_sync_count_details": { + "rate": 4 + }, + "io_sync_avg_time": 2.776535053078272, + "io_sync_avg_time_details": { + "rate": 1.9805 + }, + "io_seek_count": 23, + "io_seek_count_details": { + "rate": 0 + }, + "io_seek_avg_time": 0.077, + "io_seek_avg_time_details": { + "rate": 0 + }, + "io_reopen_count": 0, + "io_reopen_count_details": { + "rate": 0 + }, + "mnesia_ram_tx_count": 92, + "mnesia_ram_tx_count_details": { + "rate": 0 + }, + "mnesia_disk_tx_count": 1, + "mnesia_disk_tx_count_details": { + "rate": 0 + }, + "msg_store_read_count": 0, + "msg_store_read_count_details": { + "rate": 0 + }, + "msg_store_write_count": 0, + "msg_store_write_count_details": { + "rate": 0 + }, + "queue_index_journal_write_count": 448230, + "queue_index_journal_write_count_details": { + "rate": 12 + }, + "queue_index_write_count": 2, + "queue_index_write_count_details": { + "rate": 0 + }, + "queue_index_read_count": 0, + "queue_index_read_count_details": { + "rate": 0 + }, + "io_file_handle_open_attempt_count": 597670, + "io_file_handle_open_attempt_count_details": { + "rate": 16 + }, + "io_file_handle_open_attempt_avg_time": 0.004204301704954239, + "io_file_handle_open_attempt_avg_time_details": { + "rate": 0.0052875 + }, + "cluster_links": [], + "metrics_gc_queue_length": { + "connection_closed": 0, + "channel_closed": 0, + "consumer_deleted": 0, + "exchange_deleted": 0, + "queue_deleted": 0, + "vhost_deleted": 0, + "node_node_deleted": 0, + "channel_consumer_deleted": 0 + } +} diff --git a/metricbeat/module/rabbitmq/_meta/testdata/overview_sample_response.json b/metricbeat/module/rabbitmq/_meta/testdata/overview_sample_response.json new file mode 100644 index 00000000000..6351606ffb4 --- /dev/null +++ b/metricbeat/module/rabbitmq/_meta/testdata/overview_sample_response.json @@ -0,0 +1,147 @@ +{ + "management_version": "3.7.4", + "rates_mode": "basic", + "exchange_types": [ + { + "name": "fanout", + "description": "AMQP fanout exchange, as per the AMQP specification", + "enabled": true + }, + { + "name": "topic", + "description": "AMQP topic exchange, as per the AMQP specification", + "enabled": true + }, + { + "name": "direct", + "description": "AMQP direct exchange, as per the AMQP specification", + "enabled": true + }, + { + "name": "headers", + "description": "AMQP headers exchange, as per the AMQP specification", + "enabled": true + } + ], + "rabbitmq_version": "3.7.4", + "cluster_name": "rabbit@e2b1ae6390fd", + "erlang_version": "20.2.4", + "erlang_full_version": "Erlang/OTP 20 [erts-9.2.1] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:64] [hipe] [kernel-poll:true]", + "message_stats": { + "ack": 150400, + "ack_details": { + "rate": 0 + }, + "confirm": 0, + "confirm_details": { + "rate": 0 + }, + "deliver": 200494, + "deliver_details": { + "rate": 0 + }, + "deliver_get": 200494, + "deliver_get_details": { + "rate": 0 + }, + "deliver_no_ack": 0, + "deliver_no_ack_details": { + "rate": 0 + }, + "disk_reads": 0, + "disk_reads_details": { + "rate": 0 + }, + "disk_writes": 150396, + "disk_writes_details": { + "rate": 0 + }, + "get": 0, + "get_details": { + "rate": 0 + }, + "get_no_ack": 0, + "get_no_ack_details": { + "rate": 0 + }, + "publish": 150395, + "publish_details": { + "rate": 0 + }, + "redeliver": 50094, + "redeliver_details": { + "rate": 0 + }, + "return_unroutable": 0, + "return_unroutable_details": { + "rate": 0 + } + }, + "queue_totals": { + "messages": 0, + "messages_details": { + "rate": 0 + }, + "messages_ready": 0, + "messages_ready_details": { + "rate": 0 + }, + "messages_unacknowledged": 0, + "messages_unacknowledged_details": { + "rate": 0 + } + }, + "object_totals": { + "channels": 3, + "connections": 3, + "consumers": 2, + "exchanges": 9, + "queues": 1 + }, + "statistics_db_event_queue": 0, + "node": "rabbit@e2b1ae6390fd", + "listeners": [ + { + "node": "rabbit@e2b1ae6390fd", + "protocol": "amqp", + "ip_address": "::", + "port": 5672, + "socket_opts": { + "backlog": 128, + "nodelay": true, + "linger": [ + true, + 0 + ], + "exit_on_close": false + } + }, + { + "node": "rabbit@e2b1ae6390fd", + "protocol": "clustering", + "ip_address": "::", + "port": 25672, + "socket_opts": [] + }, + { + "node": "rabbit@e2b1ae6390fd", + "protocol": "http", + "ip_address": "::", + "port": 15672, + "socket_opts": { + "ssl": false, + "port": 15672 + } + } + ], + "contexts": [ + { + "ssl_opts": [], + "node": "rabbit@e2b1ae6390fd", + "description": "RabbitMQ Management", + "path": "/", + "ssl": "false", + "port": "15672" + } + ] +} diff --git a/metricbeat/module/rabbitmq/node/_meta/docs.asciidoc b/metricbeat/module/rabbitmq/node/_meta/docs.asciidoc index 5e0718e8de2..1b148a6b78e 100644 --- a/metricbeat/module/rabbitmq/node/_meta/docs.asciidoc +++ b/metricbeat/module/rabbitmq/node/_meta/docs.asciidoc @@ -1 +1,10 @@ -This is the `node` metricset of the RabbitMQ module. +This is the `node` metricset of the RabbitMQ module and collects metrics +about RabbitMQ nodes. + +The metricset has two modes to collect data which can be selected with the +`node.collect` setting: + +* `node`: collects metrics only from the node `metricbeat` connects to. This is the + default, as it is recommended to deploy `metricbeat` in all nodes. +* `cluster`: collects metrics from all the nodes in the cluster. This is recommended + when collecting metrics of an only endpoint for the whole cluster. diff --git a/metricbeat/module/rabbitmq/node/config.go b/metricbeat/module/rabbitmq/node/config.go new file mode 100644 index 00000000000..ceedb0544a0 --- /dev/null +++ b/metricbeat/module/rabbitmq/node/config.go @@ -0,0 +1,18 @@ +package node + +const ( + configCollectNode = "node" + configCollectCluster = "cluster" +) + +// Config for node metricset +type Config struct { + // Collect mode + // - `node` to collect metrics for endpoint only (default) + // - `cluster` to collect metrics for all nodes in the cluster + Collect string `config:"node.collect"` +} + +var defaultConfig = Config{ + Collect: configCollectNode, +} diff --git a/metricbeat/module/rabbitmq/node/data.go b/metricbeat/module/rabbitmq/node/data.go index a398bba528a..f22ccc2f08c 100644 --- a/metricbeat/module/rabbitmq/node/data.go +++ b/metricbeat/module/rabbitmq/node/data.go @@ -3,10 +3,9 @@ package node import ( "encoding/json" - "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" - "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" ) var ( @@ -129,27 +128,22 @@ var ( } ) -func eventsMapping(content []byte) ([]common.MapStr, error) { +func eventsMapping(r mb.ReporterV2, content []byte) { var nodes []map[string]interface{} err := json.Unmarshal(content, &nodes) if err != nil { - logp.Err("Error: ", err) - return nil, err + r.Error(err) + return } - events := []common.MapStr{} - errors := s.NewErrors() - for _, node := range nodes { - event, errs := eventMapping(node) - events = append(events, event) - errors.AddErrors(errs) - + eventMapping(r, node) } - - return events, errors } -func eventMapping(node map[string]interface{}) (common.MapStr, *s.Errors) { - return schema.Apply(node) +func eventMapping(r mb.ReporterV2, node map[string]interface{}) { + event, _ := schema.Apply(node) + r.Event(mb.Event{ + MetricSetFields: event, + }) } diff --git a/metricbeat/module/rabbitmq/node/node.go b/metricbeat/module/rabbitmq/node/node.go index 3301729e7d7..0c2e53a7ed9 100644 --- a/metricbeat/module/rabbitmq/node/node.go +++ b/metricbeat/module/rabbitmq/node/node.go @@ -1,7 +1,10 @@ package node import ( - "github.com/elastic/beats/libbeat/common" + "encoding/json" + + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/rabbitmq" @@ -19,24 +22,88 @@ type MetricSet struct { *rabbitmq.MetricSet } +// ClusterMetricSet is the MetricSet type used when node.collect is "all" +type ClusterMetricSet struct { + *rabbitmq.MetricSet +} + // New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The rabbitmq node metricset is beta") - ms, err := rabbitmq.NewMetricSet(base, rabbitmq.NodesPath) - if err != nil { + config := defaultConfig + if err := base.Module().UnpackConfig(&config); err != nil { return nil, err } - return &MetricSet{ms}, nil + + switch config.Collect { + case configCollectNode: + ms, err := rabbitmq.NewMetricSet(base, rabbitmq.OverviewPath) + if err != nil { + return nil, err + } + + return &MetricSet{ms}, nil + case configCollectCluster: + ms, err := rabbitmq.NewMetricSet(base, rabbitmq.NodesPath) + if err != nil { + return nil, err + } + + return &ClusterMetricSet{ms}, nil + default: + return nil, errors.Errorf("incorrect node.collect: %s", config.Collect) + } } -func (m *MetricSet) Fetch() ([]common.MapStr, error) { - content, err := m.HTTP.FetchContent() +type apiOverview struct { + Node string `json:"node"` +} +func (m *MetricSet) fetchOverview() (*apiOverview, error) { + d, err := m.HTTP.FetchContent() if err != nil { return nil, err } - events, _ := eventsMapping(content) - return events, nil + var apiOverview apiOverview + err = json.Unmarshal(d, &apiOverview) + if err != nil { + return nil, errors.Wrap(err, string(d)) + } + return &apiOverview, nil +} + +// Fetch metrics from rabbitmq node +func (m *MetricSet) Fetch(r mb.ReporterV2) { + o, err := m.fetchOverview() + if err != nil { + r.Error(err) + return + } + + node, err := rabbitmq.NewMetricSet(m.BaseMetricSet, rabbitmq.NodesPath+"/"+o.Node) + if err != nil { + r.Error(err) + return + } + + content, err := node.HTTP.FetchJSON() + if err != nil { + r.Error(err) + return + } + + eventMapping(r, content) +} + +// Fetch metrics from all rabbitmq nodes in the cluster +func (m *ClusterMetricSet) Fetch(r mb.ReporterV2) { + content, err := m.HTTP.FetchContent() + if err != nil { + r.Error(err) + return + } + + eventsMapping(r, content) } diff --git a/metricbeat/module/rabbitmq/node/node_integration_test.go b/metricbeat/module/rabbitmq/node/node_integration_test.go index f7565534616..7368d527901 100644 --- a/metricbeat/module/rabbitmq/node/node_integration_test.go +++ b/metricbeat/module/rabbitmq/node/node_integration_test.go @@ -14,8 +14,8 @@ import ( func TestData(t *testing.T) { compose.EnsureUp(t, "rabbitmq") - f := mbtest.NewEventsFetcher(t, getConfig()) - err := mbtest.WriteEvents(f, t) + ms := mbtest.NewReportingMetricSetV2(t, getConfig()) + err := mbtest.WriteEventsReporterV2(ms, t, "") if err != nil { t.Fatal("write", err) } diff --git a/metricbeat/module/rabbitmq/node/node_test.go b/metricbeat/module/rabbitmq/node/node_test.go index 272b5751f9c..1b0c9460df5 100644 --- a/metricbeat/module/rabbitmq/node/node_test.go +++ b/metricbeat/module/rabbitmq/node/node_test.go @@ -13,102 +13,129 @@ import ( "github.com/stretchr/testify/assert" ) -func TestFetchEventContents(t *testing.T) { +func nodeTestServer() *httptest.Server { absPath, _ := filepath.Abs("../_meta/testdata/") - response, _ := ioutil.ReadFile(absPath + "/node_sample_response.json") + nodeResponse, _ := ioutil.ReadFile(absPath + "/node_sample_response.json") + nodesResponse := []byte("[" + string(nodeResponse) + "]") + overviewResponse, _ := ioutil.ReadFile(absPath + "/overview_sample_response.json") notFound, _ := ioutil.ReadFile(absPath + "/notfound_response.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/api/nodes": + switch { + case r.URL.Path == "/api/overview": w.WriteHeader(200) w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + w.Write(overviewResponse) + case r.URL.Path == "/api/nodes": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write(nodesResponse) + case r.URL.Path == "/api/nodes/rabbit@e2b1ae6390fd": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write(nodeResponse) default: w.WriteHeader(404) w.Header().Set("Content-Type", "application/json;") w.Write([]byte(notFound)) } })) + return server +} + +func TestFetchNodeEventContents(t *testing.T) { + testFetch(t, configCollectNode) +} + +func TestFetchClusterEventContents(t *testing.T) { + testFetch(t, configCollectCluster) +} + +func testFetch(t *testing.T, collect string) { + server := nodeTestServer() defer server.Close() config := map[string]interface{}{ - "module": "rabbitmq", - "metricsets": []string{"node"}, - "hosts": []string{server.URL}, + "module": "rabbitmq", + "metricsets": []string{"node"}, + "hosts": []string{server.URL}, + "node.collect": collect, } - f := mbtest.NewEventsFetcher(t, config) - events, err := f.Fetch() - event := events[0] - if !assert.NoError(t, err) { + ms := mbtest.NewReportingMetricSetV2(t, config) + events, errors := mbtest.ReportingFetchV2(ms) + if !assert.True(t, len(errors) == 0, "There shouldn't be errors") { + t.Log(errors) + } + if !assert.True(t, len(events) > 0, "There should be events") { t.FailNow() } + event := events[0].MetricSetFields - t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event.StringToPrint()) + t.Logf("%s/%s event: %+v", ms.Module().Name(), ms.Name(), event.StringToPrint()) disk := event["disk"].(common.MapStr) free := disk["free"].(common.MapStr) - assert.EqualValues(t, 313352192, free["bytes"]) + assert.EqualValues(t, 98317942784, free["bytes"]) limit := free["limit"].(common.MapStr) assert.EqualValues(t, 50000000, limit["bytes"]) fd := event["fd"].(common.MapStr) - assert.EqualValues(t, 65536, fd["total"]) - assert.EqualValues(t, 54, fd["used"]) + assert.EqualValues(t, 1048576, fd["total"]) + assert.EqualValues(t, 31, fd["used"]) gc := event["gc"].(common.MapStr) num := gc["num"].(common.MapStr) - assert.EqualValues(t, 3184, num["count"]) + assert.EqualValues(t, 1049055, num["count"]) reclaimed := gc["reclaimed"].(common.MapStr) - assert.EqualValues(t, 270119840, reclaimed["bytes"]) + assert.EqualValues(t, 27352751800, reclaimed["bytes"]) io := event["io"].(common.MapStr) file_handle := io["file_handle"].(common.MapStr) open_attempt := file_handle["open_attempt"].(common.MapStr) avg := open_attempt["avg"].(common.MapStr) assert.EqualValues(t, 0, avg["ms"]) - assert.EqualValues(t, 10, open_attempt["count"]) + assert.EqualValues(t, 597670, open_attempt["count"]) read := io["read"].(common.MapStr) avg = read["avg"].(common.MapStr) - assert.EqualValues(t, 33, avg["ms"]) + assert.EqualValues(t, 0, avg["ms"]) assert.EqualValues(t, 1, read["bytes"]) - assert.EqualValues(t, 1, read["count"]) + assert.EqualValues(t, 3, read["count"]) reopen := io["reopen"].(common.MapStr) - assert.EqualValues(t, 1, reopen["count"]) + assert.EqualValues(t, 3, reopen["count"]) seek := io["seek"].(common.MapStr) avg = seek["avg"].(common.MapStr) assert.EqualValues(t, 0, avg["ms"]) - assert.EqualValues(t, 0, seek["count"]) + assert.EqualValues(t, 23, seek["count"]) sync := io["sync"].(common.MapStr) avg = sync["avg"].(common.MapStr) - assert.EqualValues(t, 0, avg["ms"]) - assert.EqualValues(t, 0, sync["count"]) + assert.EqualValues(t, 2, avg["ms"]) + assert.EqualValues(t, 149402, sync["count"]) write := io["write"].(common.MapStr) avg = write["avg"].(common.MapStr) assert.EqualValues(t, 0, avg["ms"]) - assert.EqualValues(t, 0, write["bytes"]) - assert.EqualValues(t, 0, write["count"]) + assert.EqualValues(t, 36305460, write["bytes"]) + assert.EqualValues(t, 149402, write["count"]) mem := event["mem"].(common.MapStr) limit = mem["limit"].(common.MapStr) - assert.EqualValues(t, 413047193, limit["bytes"]) + assert.EqualValues(t, 6628692787, limit["bytes"]) used := mem["used"].(common.MapStr) - assert.EqualValues(t, 57260080, used["bytes"]) + assert.EqualValues(t, 105504768, used["bytes"]) mnesia := event["mnesia"].(common.MapStr) disk = mnesia["disk"].(common.MapStr) tx := disk["tx"].(common.MapStr) - assert.EqualValues(t, 0, tx["count"]) + assert.EqualValues(t, 1, tx["count"]) ram := mnesia["ram"].(common.MapStr) tx = ram["tx"].(common.MapStr) - assert.EqualValues(t, 11, tx["count"]) + assert.EqualValues(t, 92, tx["count"]) msg := event["msg"].(common.MapStr) store_read := msg["store_read"].(common.MapStr) @@ -116,31 +143,31 @@ func TestFetchEventContents(t *testing.T) { store_write := msg["store_write"].(common.MapStr) assert.EqualValues(t, 0, store_write["count"]) - assert.EqualValues(t, "rabbit@prcdsrvv1682", event["name"]) + assert.EqualValues(t, "rabbit@e2b1ae6390fd", event["name"]) proc := event["proc"].(common.MapStr) assert.EqualValues(t, 1048576, proc["total"]) - assert.EqualValues(t, 322, proc["used"]) + assert.EqualValues(t, 403, proc["used"]) - assert.EqualValues(t, 2, event["processors"]) + assert.EqualValues(t, 4, event["processors"]) queue := event["queue"].(common.MapStr) index := queue["index"].(common.MapStr) journal_write := index["journal_write"].(common.MapStr) - assert.EqualValues(t, 0, journal_write["count"]) + assert.EqualValues(t, 448230, journal_write["count"]) read = index["read"].(common.MapStr) assert.EqualValues(t, 0, read["count"]) write = index["write"].(common.MapStr) - assert.EqualValues(t, 0, write["count"]) + assert.EqualValues(t, 2, write["count"]) run := event["run"].(common.MapStr) assert.EqualValues(t, 0, run["queue"]) socket := event["socket"].(common.MapStr) - assert.EqualValues(t, 58890, socket["total"]) - assert.EqualValues(t, 0, socket["used"]) + assert.EqualValues(t, 943626, socket["total"]) + assert.EqualValues(t, 3, socket["used"]) assert.EqualValues(t, "disc", event["type"]) - assert.EqualValues(t, 37139, event["uptime"]) + assert.EqualValues(t, 98754834, event["uptime"]) } diff --git a/metricbeat/module/rabbitmq/url.go b/metricbeat/module/rabbitmq/url.go index 0c22a7ed249..bbcd2d491da 100644 --- a/metricbeat/module/rabbitmq/url.go +++ b/metricbeat/module/rabbitmq/url.go @@ -9,6 +9,7 @@ const ( ConnectionsPath = "/api/connections" ExchangesPath = "/api/exchanges" NodesPath = "/api/nodes" + OverviewPath = "/api/overview" QueuesPath = "/api/queues" )