diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 8d52e5b23a97..0b9fbdd16d89 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -56,6 +56,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di - Add support to exclude labels from kubernetes pod metadata. {pull}4757[4757] - Add graphite protocol metricbeat module. {pull}4734[4734] +- Add http server metricset to support push metrics via http. {pull}4770[4770] *Packetbeat* diff --git a/metricbeat/_meta/beat.full.yml b/metricbeat/_meta/beat.full.yml new file mode 100644 index 000000000000..46b7f45f983c --- /dev/null +++ b/metricbeat/_meta/beat.full.yml @@ -0,0 +1,447 @@ +########################## Metricbeat Configuration ########################### + +# This file is a full configuration example documenting all non-deprecated +# options in comments. For a shorter configuration example, that contains only +# the most common options, please see metricbeat.yml in the same directory. +# +# You can find the full configuration reference here: +# https://www.elastic.co/guide/en/beats/metricbeat/index.html + +#============================ Config Reloading =============================== + +# Config reloading allows to dynamically load modules. Each file which is +# monitored must contain one or multiple modules as a list. +metricbeat.config.modules: + + # Glob pattern for configuration reloading + path: ${path.config}/conf.d/*.yml + + # Period on which files under path should be checked for changes + reload.period: 10s + + # Set to true to enable config reloading + reload.enabled: false + +#========================== Modules configuration ============================ +metricbeat.modules: + +#------------------------------- System Module ------------------------------- +- module: system + metricsets: + # CPU stats + - cpu + + # System Load stats + - load + + # Per CPU core stats + #- core + + # IO stats + #- diskio + + # Per filesystem stats + - filesystem + + # File system summary stats + - fsstat + + # Memory stats + - memory + + # Network stats + - network + + # Processes summary + - process_summary + + # Per process stats + - process + + # Sockets and connection info (linux only) + #- socket + enabled: true + period: 10s + processes: ['.*'] + + # if true, exports the CPU usage in ticks, together with the percentage values + #cpu_ticks: false + + # These options allow you to filter out all processes that are not + # in the top N by CPU or memory, in order to reduce the number of documents created. + # If both the `by_cpu` and `by_memory` options are used, the union of the two sets + # is included. + #process.include_top_n: + # + # Set to false to disable this feature and include all processes + #enabled: true + + # How many processes to include from the top by CPU. The processes are sorted + # by the `system.process.cpu.total.pct` field. + #by_cpu: 0 + + # How many processes to include from the top by memory. The processes are sorted + # by the `system.process.memory.rss.bytes` field. + #by_memory: 0 + + # If false, cmdline of a process is not cached. + #process.cmdline.cache.enabled: true + + # Enable collection of cgroup metrics from processes on Linux. + #process.cgroups.enabled: true + + # A list of regular expressions used to whitelist environment variables + # reported with the process metricset's events. Defaults to empty. + #process.env.whitelist: [] + + # Configure reverse DNS lookup on remote IP addresses in the socket metricset. + #socket.reverse_lookup.enabled: false + #socket.reverse_lookup.success_ttl: 60s + #socket.reverse_lookup.failure_ttl: 60s + +#------------------------------- Apache Module ------------------------------- +- module: apache + metricsets: ["status"] + enabled: false + period: 10s + + # Apache hosts + hosts: ["http://127.0.0.1"] + + # Path to server status. Default server-status + #server_status_path: "server-status" + + # Username of hosts. Empty by default + #username: username + + # Password of hosts. Empty by default + #password: password + +#-------------------------------- Audit Module ------------------------------- +- module: audit + enabled: false + metricsets: ["kernel"] + kernel.resolve_ids: true + kernel.backlog_limit: 8196 + kernel.rate_limit: 0 + kernel.include_raw_message: false + kernel.include_warnings: false + +#-------------------------------- ceph Module -------------------------------- +- module: ceph + metricsets: ["cluster_disk", "cluster_health", "monitor_health", "pool_disk"] + enabled: false + period: 10s + hosts: ["localhost:5000"] + +#------------------------------ Couchbase Module ----------------------------- +- module: couchbase + metricsets: ["bucket", "cluster", "node"] + enabled: false + period: 10s + hosts: ["localhost:8091"] + +#------------------------------- Docker Module ------------------------------- +- module: docker + metricsets: ["container", "cpu", "diskio", "healthcheck", "info", "memory", "network"] + hosts: ["unix:///var/run/docker.sock"] + enabled: false + period: 10s + + # To connect to Docker over TLS you must specify a client and CA certificate. + #ssl: + #certificate_authority: "/etc/pki/root/ca.pem" + #certificate: "/etc/pki/client/cert.pem" + #key: "/etc/pki/client/cert.key" + +#----------------------------- Dropwizard Module ----------------------------- +- module: dropwizard + metricsets: ["collector"] + enabled: false + period: 10s + hosts: ["localhost:8080"] + metrics_path: /metrics/metrics + namespace: example + +#---------------------------- elasticsearch Module --------------------------- +- module: elasticsearch + metricsets: ["node", "node_stats"] + enabled: false + period: 10s + hosts: ["localhost:9200"] + + + +#------------------------------- golang Module ------------------------------- +- module: golang + metricsets: ["expvar","heap"] + enabled: false + period: 10s + hosts: ["localhost:6060"] + heap.path: "/debug/vars" + expvar: + namespace: "example" + path: "/debug/vars" + +#------------------------------- HAProxy Module ------------------------------ +- module: haproxy + metricsets: ["info", "stat"] + enabled: false + period: 10s + hosts: ["tcp://127.0.0.1:14567"] + +#-------------------------------- HTTP Module -------------------------------- +- module: http + metricsets: ["json"] + enabled: false + period: 10s + hosts: ["localhost:80"] + namespace: "json_namespace" + path: "/" + #body: "" + #method: "GET" + #request.enabled: false + #response.enabled: false + +#------------------------------- Jolokia Module ------------------------------ +- module: jolokia + metricsets: ["jmx"] + enabled: false + period: 10s + hosts: ["localhost"] + namespace: "metrics" + path: "/jolokia/?ignoreErrors=true&canonicalNaming=false" + jmx.mapping: + jmx.application: + jmx.instance: + +#-------------------------------- kafka Module ------------------------------- +- module: kafka + metricsets: ["partition"] + enabled: false + period: 10s + hosts: ["localhost:9092"] + + #client_id: metricbeat + #retries: 3 + #backoff: 250ms + + # List of Topics to query metadata for. If empty, all topics will be queried. + #topics: [] + + # Optional SSL. By default is off. + # List of root certificates for HTTPS server verifications + #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] + + # Certificate for SSL client authentication + #ssl.certificate: "/etc/pki/client/cert.pem" + + # Client Certificate Key + #ssl.key: "/etc/pki/client/cert.key" + + # SASL authentication + #username: "" + #password: "" + +#------------------------------- kibana Module ------------------------------- +- module: kibana + metricsets: ["status"] + enabled: false + period: 10s + hosts: ["localhost:5601"] + +#----------------------------- kubernetes Module ----------------------------- +# Node metrics, from kubelet: +- module: kubernetes + enabled: false + metricsets: + - node + - system + - pod + - container + - volume + period: 10s + hosts: ["localhost:10255"] + +# State metrics from kube-state-metrics service: +- module: kubernetes + enabled: false + metricsets: + - state_node + - state_deployment + - state_replicaset + - state_pod + - state_container + period: 10s + hosts: ["kube-state-metrics:8080"] + +# Kubernetes events +- module: kubernetes + metricsets: + - event + kube_config: ${HOME}/.kube/config + in_cluster: false + +#------------------------------ memcached Module ----------------------------- +- module: memcached + metricsets: ["stats"] + enabled: false + period: 10s + hosts: ["localhost:11211"] + + +#------------------------------- MongoDB Module ------------------------------ +- module: mongodb + metricsets: ["dbstats", "status"] + enabled: false + period: 10s + + # The hosts must be passed as MongoDB URLs in the format: + # [mongodb://][user:pass@]host[:port]. + # The username and password can also be set using the respective configuration + # options. The credentials in the URL take precedence over the username and + # password configuration options. + hosts: ["localhost:27017"] + + # Username to use when connecting to MongoDB. Empty by default. + #username: user + + # Password to use when connecting to MongoDB. Empty by default. + #password: pass + +#-------------------------------- MySQL Module ------------------------------- +- module: mysql + metricsets: ["status"] + enabled: false + period: 10s + + # Host DSN should be defined as "user:pass@tcp(127.0.0.1:3306)/" + # The username and password can either be set in the DSN or using the username + # and password config options. Those specified in the DSN take precedence. + hosts: ["root:secret@tcp(127.0.0.1:3306)/"] + + # Username of hosts. Empty by default. + #username: root + + # Password of hosts. Empty by default. + #password: secret + + # By setting raw to true, all raw fields from the status metricset will be added to the event. + #raw: false + +#-------------------------------- Nginx Module ------------------------------- +#- module: nginx + #metricsets: ["stubstatus"] + #enabled: true + #period: 10s + + # Nginx hosts + #hosts: ["http://127.0.0.1"] + + # Path to server status. Default server-status + #server_status_path: "server-status" + +#------------------------------- php_fpm Module ------------------------------ +- module: php_fpm + metricsets: ["pool"] + enabled: false + period: 10s + status_path: "/status" + hosts: ["localhost:8080"] + +#----------------------------- PostgreSQL Module ----------------------------- +- module: postgresql + metricsets: + # Stats about every PostgreSQL database + - database + + # Stats about the background writer process's activity + - bgwriter + + # Stats about every PostgreSQL process + - activity + + enabled: false + period: 10s + + # The host must be passed as PostgreSQL URL. Example: + # postgres://localhost:5432?sslmode=disable + # The available parameters are documented here: + # https://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters + hosts: ["postgres://localhost:5432"] + + # Username to use when connecting to PostgreSQL. Empty by default. + #username: user + + # Password to use when connecting to PostgreSQL. Empty by default. + #password: pass + + +#----------------------------- Prometheus Module ----------------------------- +- module: prometheus + metricsets: ["stats"] + enabled: false + period: 10s + hosts: ["localhost:9090"] + metrics_path: /metrics + #namespace: example + +#-------------------------------- Redis Module ------------------------------- +- module: redis + metricsets: ["info", "keyspace"] + enabled: false + period: 10s + + # Redis hosts + hosts: ["127.0.0.1:6379"] + + # Timeout after which time a metricset should return an error + # Timeout is by default defined as period, as a fetch of a metricset + # should never take longer then period, as otherwise calls can pile up. + #timeout: 1s + + # Optional fields to be added to each event + #fields: + # datacenter: west + + # Network type to be used for redis connection. Default: tcp + #network: tcp + + # Max number of concurrent connections. Default: 10 + #maxconn: 10 + + # Filters can be used to reduce the number of fields sent. + #filters: + # - include_fields: + # fields: ["stats"] + + # Redis AUTH password. Empty by default. + #password: foobared + +#------------------------------- vsphere Module ------------------------------ +- module: vsphere + metricsets: ["datastore, host, virtualmachine"] + enabled: false + period: 10s + hosts: ["https://localhost/sdk"] + + username: "user" + password: "password" + # If insecure is true, don't verify the server's certificate chain + insecure: false + +#------------------------------- Windows Module ------------------------------ +- module: windows + metricsets: ["perfmon"] + enabled: false + period: 10s + perfmon.counters: + +#------------------------------ ZooKeeper Module ----------------------------- +- module: zookeeper + metricsets: ["mntr"] + enabled: false + period: 10s + hosts: ["localhost:2181"] + + diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index ca75ffbf3f39..278733e95c2d 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -3874,6 +3874,12 @@ The HTTP payload received json metricset +[float] +== server fields + +server + + [[exported-fields-jolokia]] == Jolokia fields diff --git a/metricbeat/docs/modules/http.asciidoc b/metricbeat/docs/modules/http.asciidoc index 5c6bef9e3e58..46cd1f66aaa8 100644 --- a/metricbeat/docs/modules/http.asciidoc +++ b/metricbeat/docs/modules/http.asciidoc @@ -33,6 +33,17 @@ metricbeat.modules: #method: "GET" #request.enabled: false #response.enabled: false + +- module: http + metricsets: ["server"] + host: "localhost" + port: "8080" + enabled: false +# paths: +# - path: "/foo" +# namespace: "foo" +# fields: # added to the the response in root. overwrites existing fields +# key: "value" ---- [float] @@ -42,5 +53,9 @@ The following metricsets are available: * <> +* <> + include::http/json.asciidoc[] +include::http/server.asciidoc[] + diff --git a/metricbeat/docs/modules/http/server.asciidoc b/metricbeat/docs/modules/http/server.asciidoc new file mode 100644 index 000000000000..995d469120d9 --- /dev/null +++ b/metricbeat/docs/modules/http/server.asciidoc @@ -0,0 +1,19 @@ +//// +This file is generated! See scripts/docs_collector.py +//// + +[[metricbeat-metricset-http-server]] +include::../../../module/http/server/_meta/docs.asciidoc[] + + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/http/server/_meta/data.json[] +---- diff --git a/metricbeat/helper/server/http/config.go b/metricbeat/helper/server/http/config.go new file mode 100644 index 000000000000..115c747aa3e6 --- /dev/null +++ b/metricbeat/helper/server/http/config.go @@ -0,0 +1,13 @@ +package http + +type HttpConfig struct { + Host string `config:"host"` + Port int `config:"port"` +} + +func defaultHttpConfig() HttpConfig { + return HttpConfig{ + Host: "localhost", + Port: 8080, + } +} diff --git a/metricbeat/helper/server/http/http.go b/metricbeat/helper/server/http/http.go new file mode 100644 index 000000000000..9346d4e473ba --- /dev/null +++ b/metricbeat/helper/server/http/http.go @@ -0,0 +1,117 @@ +package http + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper/server" + "github.com/elastic/beats/metricbeat/mb" +) + +type HttpServer struct { + server *http.Server + ctx context.Context + stop context.CancelFunc + done chan struct{} + eventQueue chan server.Event +} + +type HttpEvent struct { + event common.MapStr + meta server.Meta +} + +func (h *HttpEvent) GetEvent() common.MapStr { + return h.event +} + +func (h *HttpEvent) GetMeta() server.Meta { + return h.meta +} + +func NewHttpServer(mb mb.BaseMetricSet) (server.Server, error) { + config := defaultHttpConfig() + err := mb.Module().UnpackConfig(&config) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + h := &HttpServer{ + done: make(chan struct{}), + eventQueue: make(chan server.Event), + ctx: ctx, + stop: cancel, + } + + httpServer := &http.Server{ + Addr: fmt.Sprintf("%s:%d", config.Host, config.Port), + Handler: http.HandlerFunc(h.handleFunc), + } + h.server = httpServer + + return h, nil +} + +func (h *HttpServer) Start() { + go func() { + + logp.Info("Starting http server on %s", h.server.Addr) + err := h.server.ListenAndServe() + if err != nil { + logp.Critical("Unable to start HTTP server due to error: %v", err) + } + }() + +} + +func (h *HttpServer) Stop() { + close(h.done) + h.stop() + h.server.Shutdown(h.ctx) + close(h.eventQueue) +} + +func (h *HttpServer) GetEvents() chan server.Event { + return h.eventQueue +} + +func (h *HttpServer) handleFunc(writer http.ResponseWriter, req *http.Request) { + switch req.Method { + case "POST": + meta := server.Meta{ + "path": req.URL.String(), + } + + contentType := req.Header.Get("Content-Type") + if contentType != "" { + meta["Content-Type"] = contentType + } + + body, err := ioutil.ReadAll(req.Body) + if err != nil { + logp.Err("Error reading body: %v", err) + http.Error(writer, "Unexpected error reading request payload", http.StatusBadRequest) + return + } + + payload := common.MapStr{ + server.EventDataKey: body, + } + + event := &HttpEvent{ + event: payload, + meta: meta, + } + h.eventQueue <- event + writer.WriteHeader(http.StatusAccepted) + + case "GET": + writer.WriteHeader(http.StatusOK) + writer.Write([]byte("HTTP Server accepts data via POST")) + } +} diff --git a/metricbeat/helper/server/http/http_test.go b/metricbeat/helper/server/http/http_test.go new file mode 100644 index 000000000000..15bbfe1b36b2 --- /dev/null +++ b/metricbeat/helper/server/http/http_test.go @@ -0,0 +1,74 @@ +// +build !integration + +package http + +import ( + "bytes" + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/elastic/beats/metricbeat/helper/server" + + "github.com/stretchr/testify/assert" +) + +func GetHttpServer(host string, port int) (server.Server, error) { + ctx, cancel := context.WithCancel(context.Background()) + h := &HttpServer{ + done: make(chan struct{}), + eventQueue: make(chan server.Event, 1), + ctx: ctx, + stop: cancel, + } + + httpServer := &http.Server{ + Addr: fmt.Sprintf("%s:%d", host, port), + Handler: http.HandlerFunc(h.handleFunc), + } + h.server = httpServer + + return h, nil +} + +func TestHttpServer(t *testing.T) { + host := "127.0.0.1" + port := 40050 + svc, err := GetHttpServer(host, port) + if err != nil { + t.Error(err) + t.FailNow() + } + + svc.Start() + defer svc.Stop() + // make sure server is up before writing data into it. + time.Sleep(2 * time.Second) + writeToServer(t, "test1", host, port) + msg := <-svc.GetEvents() + + assert.True(t, msg.GetEvent() != nil) + ok, _ := msg.GetEvent().HasKey("data") + assert.True(t, ok) + bytes, _ := msg.GetEvent()["data"].([]byte) + assert.True(t, string(bytes) == "test1") + +} + +func writeToServer(t *testing.T, message, host string, port int) { + url := fmt.Sprintf("http://%s:%d/", host, port) + var str = []byte(message) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(str)) + req.Header.Set("Content-Type", "text/plain") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + t.Error(err) + t.FailNow() + } + defer resp.Body.Close() + +} diff --git a/metricbeat/include/list.go b/metricbeat/include/list.go index 1d02db498273..fa77791b8290 100644 --- a/metricbeat/include/list.go +++ b/metricbeat/include/list.go @@ -45,6 +45,7 @@ import ( _ "github.com/elastic/beats/metricbeat/module/haproxy/stat" _ "github.com/elastic/beats/metricbeat/module/http" _ "github.com/elastic/beats/metricbeat/module/http/json" + _ "github.com/elastic/beats/metricbeat/module/http/server" _ "github.com/elastic/beats/metricbeat/module/jolokia" _ "github.com/elastic/beats/metricbeat/module/jolokia/jmx" _ "github.com/elastic/beats/metricbeat/module/kafka" diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 9361ab4b73c7..d4e6efa5dc7c 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -185,6 +185,17 @@ metricbeat.modules: #request.enabled: false #response.enabled: false +- module: http + metricsets: ["server"] + host: "localhost" + port: "8080" + enabled: false +# paths: +# - path: "/foo" +# namespace: "foo" +# fields: # added to the the response in root. overwrites existing fields +# key: "value" + #------------------------------- Jolokia Module ------------------------------ - module: jolokia metricsets: ["jmx"] diff --git a/metricbeat/module/http/_meta/config.yml b/metricbeat/module/http/_meta/config.yml index aa40cfb7cc1e..32be789a932a 100644 --- a/metricbeat/module/http/_meta/config.yml +++ b/metricbeat/module/http/_meta/config.yml @@ -8,3 +8,14 @@ #method: "GET" #request.enabled: false #response.enabled: false + +- module: http + metricsets: ["server"] + host: "localhost" + port: "8080" + enabled: false +# paths: +# - path: "/foo" +# namespace: "foo" +# fields: # added to the the response in root. overwrites existing fields +# key: "value" diff --git a/metricbeat/module/http/server/_meta/data.json b/metricbeat/module/http/server/_meta/data.json new file mode 100644 index 000000000000..26af4a231cf9 --- /dev/null +++ b/metricbeat/module/http/server/_meta/data.json @@ -0,0 +1,19 @@ +{ + "@timestamp":"2016-05-23T08:05:34.853Z", + "beat":{ + "hostname":"beathost", + "name":"beathost" + }, + "metricset":{ + "host":"localhost", + "module":"http", + "name":"server", + "rtt":44269 + }, + "http":{ + "server":{ + "test_metric": 5, + } + }, + "type":"metricsets" +} diff --git a/metricbeat/module/http/server/_meta/docs.asciidoc b/metricbeat/module/http/server/_meta/docs.asciidoc new file mode 100644 index 000000000000..5656331c2ca4 --- /dev/null +++ b/metricbeat/module/http/server/_meta/docs.asciidoc @@ -0,0 +1,3 @@ +=== http server MetricSet + +This is the server metricset of the module http. diff --git a/metricbeat/module/http/server/_meta/fields.yml b/metricbeat/module/http/server/_meta/fields.yml new file mode 100644 index 000000000000..43c3f8aa3220 --- /dev/null +++ b/metricbeat/module/http/server/_meta/fields.yml @@ -0,0 +1,5 @@ +- name: server + type: group + description: > + server + fields: diff --git a/metricbeat/module/http/server/config.go b/metricbeat/module/http/server/config.go new file mode 100644 index 000000000000..2a7d9355dc4c --- /dev/null +++ b/metricbeat/module/http/server/config.go @@ -0,0 +1,39 @@ +package server + +import ( + "errors" + + "github.com/elastic/beats/libbeat/common" +) + +type httpServerConfig struct { + Paths []pathConfig `config:"paths"` + DefaultPath pathConfig `config:"default_path"` +} + +type pathConfig struct { + Path string `config:"path"` + Fields common.MapStr `config:"fields"` + Namespace string `config:"namespace"` +} + +func defaultHttpServerConfig() httpServerConfig { + return httpServerConfig{ + DefaultPath: pathConfig{ + Path: "/", + Namespace: "http", + }, + } +} + +func (p pathConfig) Validate() error { + if p.Namespace == "" { + return errors.New("`namespace` can not be empty in path configuration") + } + + if p.Path == "" { + return errors.New("`path` can not be empty in path configuration") + } + + return nil +} diff --git a/metricbeat/module/http/server/data.go b/metricbeat/module/http/server/data.go new file mode 100644 index 000000000000..220572483260 --- /dev/null +++ b/metricbeat/module/http/server/data.go @@ -0,0 +1,96 @@ +package server + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/helper/server" + "github.com/elastic/beats/metricbeat/mb" +) + +type metricProcessor struct { + paths map[string]pathConfig + defaultPath pathConfig + sync.RWMutex +} + +func NewMetricProcessor(paths []pathConfig, defaultPath pathConfig) *metricProcessor { + pathMap := map[string]pathConfig{} + for _, path := range paths { + pathMap[path.Path] = path + } + + return &metricProcessor{ + paths: pathMap, + defaultPath: defaultPath, + } +} + +func (m *metricProcessor) AddPath(path pathConfig) { + m.Lock() + m.paths[path.Path] = path + m.Unlock() +} + +func (m *metricProcessor) RemovePath(path pathConfig) { + m.Lock() + delete(m.paths, path.Path) + m.Unlock() +} + +func (p *metricProcessor) Process(event server.Event) (common.MapStr, error) { + urlRaw, ok := event.GetMeta()["path"] + if !ok { + return nil, errors.New("Malformed HTTP event. Path missing.") + } + url, _ := urlRaw.(string) + + typeRaw, ok := event.GetMeta()["Content-Type"] + if !ok { + return nil, errors.New("Unable to get Content-Type of request") + } + contentType := typeRaw.(string) + pathConf := p.findPath(url) + + bytesRaw, ok := event.GetEvent()[server.EventDataKey] + if !ok { + return nil, errors.New("Unable to retrieve response bytes") + } + + bytes, _ := bytesRaw.([]byte) + if len(bytes) == 0 { + return nil, errors.New("Request has no data") + } + + out := common.MapStr{} + switch contentType { + case "application/json": + err := json.Unmarshal(bytes, &out) + if err != nil { + return nil, err + } + default: + return nil, errors.New(fmt.Sprintf("Unsupported Content-Type: %s", contentType)) + } + + out[mb.NamespaceKey] = pathConf.Namespace + if len(pathConf.Fields) != 0 { + // Overwrite any keys that are present in the incoming payload + common.MergeFields(out, pathConf.Fields, true) + } + return out, nil +} + +func (p *metricProcessor) findPath(url string) *pathConfig { + for path, conf := range p.paths { + if strings.Index(url, path) == 0 { + return &conf + } + } + + return &p.defaultPath +} diff --git a/metricbeat/module/http/server/data_test.go b/metricbeat/module/http/server/data_test.go new file mode 100644 index 000000000000..cbd0a58d7c1a --- /dev/null +++ b/metricbeat/module/http/server/data_test.go @@ -0,0 +1,79 @@ +package server + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +func GetMetricProcessor() *metricProcessor { + paths := []pathConfig{ + { + Namespace: "foo", + Path: "/foo", + Fields: common.MapStr{ + "a": "b", + }, + }, + { + Namespace: "bar", + Path: "/bar", + }, + } + + defaultPath := defaultHttpServerConfig().DefaultPath + return NewMetricProcessor(paths, defaultPath) +} + +func TestMetricProcessorAddPath(t *testing.T) { + processor := GetMetricProcessor() + temp := pathConfig{ + Namespace: "xyz", + Path: "/abc", + } + processor.AddPath(temp) + out, _ := processor.paths[temp.Path] + assert.NotNil(t, out) + assert.Equal(t, out.Namespace, temp.Namespace) +} + +func TestMetricProcessorDeletePath(t *testing.T) { + processor := GetMetricProcessor() + processor.RemovePath(processor.paths["bar"]) + _, ok := processor.paths["bar"] + assert.Equal(t, ok, false) +} + +func TestFindPath(t *testing.T) { + processor := GetMetricProcessor() + tests := []struct { + a string + expected pathConfig + }{ + { + a: "/foo/bar", + expected: processor.paths["/foo"], + }, + { + a: "/", + expected: processor.defaultPath, + }, + { + a: "/abc", + expected: processor.defaultPath, + }, + } + + for i, test := range tests { + a, expected := test.a, test.expected + name := fmt.Sprintf("%v: %v = %v", i, a, expected) + + t.Run(name, func(t *testing.T) { + b := processor.findPath(a) + assert.Equal(t, expected, *b) + }) + } +} diff --git a/metricbeat/module/http/server/server.go b/metricbeat/module/http/server/server.go new file mode 100644 index 000000000000..562908d484ac --- /dev/null +++ b/metricbeat/module/http/server/server.go @@ -0,0 +1,72 @@ +package server + +import ( + "github.com/elastic/beats/libbeat/common/cfgwarn" + serverhelper "github.com/elastic/beats/metricbeat/helper/server" + "github.com/elastic/beats/metricbeat/helper/server/http" + "github.com/elastic/beats/metricbeat/mb" +) + +// init registers the MetricSet with the central registry. +// The New method will be called after the setup of the module and before starting to fetch data +func init() { + if err := mb.Registry.AddMetricSet("http", "server", New); err != nil { + panic(err) + } +} + +// MetricSet type defines all fields of the MetricSet +// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with +// additional entries. These variables can be used to persist data or configuration between +// multiple fetch calls. +type MetricSet struct { + mb.BaseMetricSet + server serverhelper.Server + processor *metricProcessor +} + +// New create a new instance of the MetricSet +// Part of new is also setting up the configuration by processing additional +// configuration entries if needed. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + cfgwarn.Experimental("EXPERIMENTAL: The http server metricset is experimental") + + config := defaultHttpServerConfig() + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + svc, err := http.NewHttpServer(base) + if err != nil { + return nil, err + } + + processor := NewMetricProcessor(config.Paths, config.DefaultPath) + return &MetricSet{ + BaseMetricSet: base, + server: svc, + processor: processor, + }, nil +} + +// Run method provides the Graphite server with a reporter with which events can be reported. +func (m *MetricSet) Run(reporter mb.PushReporter) { + // Start event watcher + m.server.Start() + + for { + select { + case <-reporter.Done(): + m.server.Stop() + return + case msg := <-m.server.GetEvents(): + event, err := m.processor.Process(msg) + if err != nil { + reporter.Error(err) + } else { + reporter.Event(event) + } + + } + } +} diff --git a/metricbeat/modules.d/http.yml.disabled b/metricbeat/modules.d/http.yml.disabled index aa40cfb7cc1e..32be789a932a 100644 --- a/metricbeat/modules.d/http.yml.disabled +++ b/metricbeat/modules.d/http.yml.disabled @@ -8,3 +8,14 @@ #method: "GET" #request.enabled: false #response.enabled: false + +- module: http + metricsets: ["server"] + host: "localhost" + port: "8080" + enabled: false +# paths: +# - path: "/foo" +# namespace: "foo" +# fields: # added to the the response in root. overwrites existing fields +# key: "value"