From 873500aed7ec54b45f4900e193691b4f647704d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 25 May 2020 15:30:53 +0200 Subject: [PATCH] New Filebeat input: http_endpoint (#18298) (#18719) ## What does this PR do? This filebeat input configures a HTTP port listener, accepting JSON formatted POST requests, which again is formatted into a event, initially the event is created with the "json." prefix and expects the ingest pipeline to mutate the event during ingestion. The initial set of features is based on the Logstash input plugin, but implemented differently: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-http.html ## Why is it important? This idea is based on a few different scenarios: - The user already has a large beats installation and no Logstash, and do not want to install Logstash solely for a single feature. - HTTP Input allows applications to be directly integrated with Elastic, without needing connectivity to Elasticsearch directly (or Logstash). - Allows us to integrate and create modules for any product that supports HTTP POST events like SOAR, cloud applications, ticketing systems etc etc. ## Features currently implemented - HTTP Basic Auth On/Off - HTTP/HTTPS configurable - Listening interface and port configurable - Response code on success configurable - Response body on success configurable - Response header on success configurable - Proper HTTP codes on both success and error responses - Message prefix configurable - URL to post to is configurable - SSL path to cert, key and CA is configurable. (cherry picked from commit 0b84f0ab6d654b65e1fbdbec46228b317eff5d19) Co-authored-by: Marius Iversen --- CHANGELOG.next.asciidoc | 1 + filebeat/docs/filebeat-options.asciidoc | 3 + .../docs/inputs/input-http-endpoint.asciidoc | 116 ++++++++ x-pack/filebeat/include/list.go | 1 + x-pack/filebeat/input/http_endpoint/config.go | 48 ++++ .../input/http_endpoint/httpserver.go | 77 +++++ x-pack/filebeat/input/http_endpoint/input.go | 270 ++++++++++++++++++ .../tests/system/test_http_endpoint.py | 218 ++++++++++++++ 8 files changed, 734 insertions(+) create mode 100644 x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc create mode 100644 x-pack/filebeat/input/http_endpoint/config.go create mode 100644 x-pack/filebeat/input/http_endpoint/httpserver.go create mode 100644 x-pack/filebeat/input/http_endpoint/input.go create mode 100644 x-pack/filebeat/tests/system/test_http_endpoint.py diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 49093b91964d..be2b6c708e72 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -412,6 +412,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Improve ECS categorization field mappings in osquery module. {issue}16176[16176] {pull}17881[17881] - Add support for v10, v11 and v12 logs on Postgres {issue}13810[13810] {pull}17732[17732] - Add dashboard for Google Cloud Audit and AWS CloudTrail. {pull}17379[17379] +- Added http_endpoint input{pull}18298[18298] - Add support for array parsing in azure-eventhub input. {pull}18585[18585] - The `logstash` module can now automatically detect the log file format (JSON or plaintext) and process it accordingly. {issue}9964[9964] {pull}18095[18095] - Improve ECS categorization field mappings in coredns module. {issue}16159[16159] {pull}18424[18424] diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc index 6f9a49d43fd4..456588808544 100644 --- a/filebeat/docs/filebeat-options.asciidoc +++ b/filebeat/docs/filebeat-options.asciidoc @@ -47,6 +47,7 @@ You can configure {beatname_uc} to use the following inputs: * <<{beatname_lc}-input-container>> * <<{beatname_lc}-input-docker>> * <<{beatname_lc}-input-google-pubsub>> +* <<{beatname_lc}-input-http_endpoint>> * <<{beatname_lc}-input-httpjson>> * <<{beatname_lc}-input-kafka>> * <<{beatname_lc}-input-log>> @@ -73,6 +74,8 @@ include::inputs/input-docker.asciidoc[] include::../../x-pack/filebeat/docs/inputs/input-google-pubsub.asciidoc[] +include::../../x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc[] + include::../../x-pack/filebeat/docs/inputs/input-httpjson.asciidoc[] include::inputs/input-kafka.asciidoc[] diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc new file mode 100644 index 000000000000..2a949b01d26a --- /dev/null +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -0,0 +1,116 @@ +[role="xpack"] + +:type: http_endpoint + +[id="{beatname_lc}-input-{type}"] +=== HTTP Endpoint input + +++++ +HTTP Endpoint +++++ + +beta[] + +Use the `http_endpoint` input to create a HTTP listener that can receive incoming HTTP POST requests. + +This input can for example be used to receive incoming webhooks from a third-party application or service. + +Example configurations: + +Basic example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 +---- + +Custom response example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 + response_code: 200 + response_body: '{"message": "success"}' + url: "/" + prefix: "json" +---- + +Basic auth and SSL example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 + ssl.enabled: true + ssl.certificate: "/home/user/server.pem" + ssl.key: "/home/user/server.key" + ssl.verification_mode: "none" + ssl.certificate_authority: "/home/user/ca.pem" + basic_auth: true + username: someuser + password: somepassword +---- + + +==== Configuration options + +The `http_endpoint` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[float] +==== `basic_auth` + +Enables or disables HTTP basic auth for each incoming request. If enabled then `username` and `password` will also need to be configured. + +[float] +==== `username` + +If `basic_auth` is enabled, this is the username used for authentication against the HTTP listener. Requires `password` to also be set. + +[float] +==== `password` + +If `basic_auth` is eanbled, this is the password used for authentication against the HTTP listener. Requires `username` to also be set. + +[float] +==== `response_code` + +The HTTP response code returned upon success. Should be in the 2XX range. + +[float] +==== `response_body` + +The response body returned upon success. + +[float] +==== `listen_address` + +If multiple interfaces is present the `listen_address` can be set to control which IP address the listener binds to. Defaults to `127.0.0.1`. + +[float] +==== `listen_port` + +Which port the listener binds to. Defaults to 8000 + +[float] +==== `url` + +This options specific which URL path to accept requests on. Defaults to `/` + +[float] +==== `prefix` + +This option specifies which prefix the incoming request will be mapped to. + +[id="{beatname_lc}-input-{type}-common-options"] +include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] + +:type!: diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index 1633307c9508..a5719d18fc73 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -11,6 +11,7 @@ import ( _ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/googlepubsub" + _ "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go new file mode 100644 index 000000000000..0626f5e2afd8 --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -0,0 +1,48 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "encoding/json" + "errors" + + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" +) + +// Config contains information about httpjson configuration +type config struct { + TLS *tlscommon.ServerConfig `config:"ssl"` + BasicAuth bool `config:"basic_auth"` + Username string `config:"username"` + Password string `config:"password"` + ResponseCode int `config:"response_code" validate:"positive"` + ResponseBody string `config:"response_body"` + ListenAddress string `config:"listen_address"` + ListenPort string `config:"listen_port"` + URL string `config:"url"` + Prefix string `config:"prefix"` +} + +func defaultConfig() config { + return config{ + BasicAuth: false, + Username: "", + Password: "", + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "8000", + URL: "/", + Prefix: "json", + } +} + +func (c *config) Validate() error { + if !json.Valid([]byte(c.ResponseBody)) { + return errors.New("response_body must be valid JSON") + } + + return nil +} diff --git a/x-pack/filebeat/input/http_endpoint/httpserver.go b/x-pack/filebeat/input/http_endpoint/httpserver.go new file mode 100644 index 000000000000..68325caaeb48 --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/httpserver.go @@ -0,0 +1,77 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "context" + "net/http" + "time" + + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type HttpServer struct { + log *logp.Logger + server *http.Server + ctx context.Context + stop context.CancelFunc +} + +func (h *HttpServer) Start() { + go func() { + if h.server.TLSConfig != nil { + h.log.Infof("Starting HTTPS server on %s", h.server.Addr) + //certificate is already loaded. That's why the parameters are empty + err := h.server.ListenAndServeTLS("", "") + if err != nil && err != http.ErrServerClosed { + h.log.Fatalf("Unable to start HTTPS server due to error: %v", err) + } + } else { + h.log.Infof("Starting HTTP server on %s", h.server.Addr) + err := h.server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + h.log.Fatalf("Unable to start HTTP server due to error: %v", err) + } + } + }() +} + +func (h *HttpServer) Stop() { + h.log.Info("Stopping HTTP server") + h.stop() + if err := h.server.Shutdown(h.ctx); err != nil { + h.log.Fatalf("Unable to stop HTTP server due to error: %v", err) + } +} + +func createServer(in *HttpEndpoint) (*HttpServer, error) { + mux := http.NewServeMux() + responseHandler := http.HandlerFunc(in.apiResponse) + mux.Handle(in.config.URL, in.validateRequest(responseHandler)) + server := &http.Server{ + Addr: in.config.ListenAddress + ":" + in.config.ListenPort, + Handler: mux, + } + + tlsConfig, err := tlscommon.LoadTLSServerConfig(in.config.TLS) + if err != nil { + return nil, err + } + + if tlsConfig != nil { + server.TLSConfig = tlsConfig.BuildModuleConfig(in.config.ListenAddress) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + h := &HttpServer{ + ctx: ctx, + stop: cancel, + log: logp.NewLogger("http_server"), + } + h.server = server + + return h, nil +} diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go new file mode 100644 index 000000000000..8c6211f8147a --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -0,0 +1,270 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/filebeat/channel" + "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +const ( + inputName = "http_endpoint" +) + +func init() { + err := input.Register(inputName, NewInput) + if err != nil { + panic(errors.Wrapf(err, "failed to register %v input", inputName)) + } +} + +type HttpEndpoint struct { + config + log *logp.Logger + outlet channel.Outleter // Output of received messages. + inputCtx context.Context // Wraps the Done channel from parent input.Context. + + workerCtx context.Context // Worker goroutine context. It's cancelled when the input stops or the worker exits. + workerCancel context.CancelFunc // Used to signal that the worker should stop. + workerOnce sync.Once // Guarantees that the worker goroutine is only started once. + workerWg sync.WaitGroup // Waits on worker goroutine. + server *HttpServer // Server instance + eventObject *map[string]interface{} // Current event object + finalHandler http.HandlerFunc +} + +// NewInput creates a new httpjson input +func NewInput( + cfg *common.Config, + connector channel.Connector, + inputContext input.Context, +) (input.Input, error) { + // Extract and validate the input's configuration. + conf := defaultConfig() + if err := cfg.Unpack(&conf); err != nil { + return nil, err + } + + // Build outlet for events. + out, err := connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: inputContext.DynamicFields, + }, + }) + if err != nil { + return nil, err + } + + // Wrap input.Context's Done channel with a context.Context. This goroutine + // stops with the parent closes the Done channel. + inputCtx, cancelInputCtx := context.WithCancel(context.Background()) + go func() { + defer cancelInputCtx() + select { + case <-inputContext.Done: + case <-inputCtx.Done(): + } + }() + + // If the input ever needs to be made restartable, then context would need + // to be recreated with each restart. + workerCtx, workerCancel := context.WithCancel(inputCtx) + + in := &HttpEndpoint{ + config: conf, + log: logp.NewLogger(inputName), + outlet: out, + inputCtx: inputCtx, + workerCtx: workerCtx, + workerCancel: workerCancel, + } + + // Create an instance of the HTTP server with the beat context + in.server, err = createServer(in) + if err != nil { + return nil, err + } + + in.log.Infof("Initialized %v input on %v:%v", inputName, in.config.ListenAddress, in.config.ListenPort) + + return in, nil +} + +// Run starts the input worker then returns. Only the first invocation +// will ever start the worker. +func (in *HttpEndpoint) Run() { + in.workerOnce.Do(func() { + in.workerWg.Add(1) + go in.run() + }) +} + +func (in *HttpEndpoint) run() { + defer in.workerWg.Done() + defer in.log.Infof("%v worker has stopped.", inputName) + in.server.Start() +} + +// Stops HTTP input and waits for it to finish +func (in *HttpEndpoint) Stop() { + in.workerCancel() + in.workerWg.Wait() +} + +// Wait is an alias for Stop. +func (in *HttpEndpoint) Wait() { + in.Stop() +} + +// If middleware validation successed, event is sent +func (in *HttpEndpoint) sendEvent(w http.ResponseWriter, r *http.Request) { + event := in.outlet.OnEvent(beat.Event{ + Timestamp: time.Now().UTC(), + Fields: common.MapStr{ + in.config.Prefix: in.eventObject, + }, + }) + if !event { + in.sendResponse(w, http.StatusInternalServerError, in.createErrorMessage("Unable to send event")) + } +} + +// Triggers if middleware validation returns successful +func (in *HttpEndpoint) apiResponse(w http.ResponseWriter, r *http.Request) { + in.sendEvent(w, r) + w.Header().Add("Content-Type", "application/json") + in.sendResponse(w, uint(in.config.ResponseCode), in.config.ResponseBody) +} + +func (in *HttpEndpoint) sendResponse(w http.ResponseWriter, h uint, b string) { + w.WriteHeader(int(h)) + w.Write([]byte(b)) +} + +// Runs all validations for each request +func (in *HttpEndpoint) validateRequest(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if in.config.BasicAuth { + status, err := in.validateAuth(w, r) + if err != "" && status != 0 { + in.sendResponse(w, status, err) + return + } + } + + status, err := in.validateMethod(w, r) + if err != "" && status != 0 { + in.sendResponse(w, status, err) + return + } + + status, err = in.validateHeader(w, r) + if err != "" && status != 0 { + in.sendResponse(w, status, err) + return + } + + status, err = in.validateBody(w, r) + if err != "" && status != 0 { + in.sendResponse(w, status, err) + return + } + + next.ServeHTTP(w, r) + }) +} + +// Validate that only supported Accept and Content type headers are used +func (in *HttpEndpoint) validateHeader(w http.ResponseWriter, r *http.Request) (uint, string) { + if r.Header.Get("Content-Type") != "application/json" { + return http.StatusUnsupportedMediaType, in.createErrorMessage("Wrong Content-Type header, expecting application/json") + } + + if r.Header.Get("Accept") != "application/json" { + return http.StatusNotAcceptable, in.createErrorMessage("Wrong Accept header, expecting application/json") + } + return 0, "" +} + +// Validate if headers are current and authentication is successful +func (in *HttpEndpoint) validateAuth(w http.ResponseWriter, r *http.Request) (uint, string) { + if in.config.Username == "" || in.config.Password == "" { + return http.StatusUnauthorized, in.createErrorMessage("Username and password required when basicauth is enabled") + } + + username, password, _ := r.BasicAuth() + if in.config.Username != username || in.config.Password != password { + return http.StatusUnauthorized, in.createErrorMessage("Incorrect username or password") + } + + return 0, "" +} + +// Validates that body is not empty, not a list of objects and valid JSON +func (in *HttpEndpoint) validateBody(w http.ResponseWriter, r *http.Request) (uint, string) { + if r.Body == http.NoBody { + return http.StatusNotAcceptable, in.createErrorMessage("Body cannot be empty") + } + + body, err := ioutil.ReadAll(r.Body) + if err != nil { + return http.StatusInternalServerError, in.createErrorMessage("Unable to read body") + } + + isObject := in.isObjectOrList(body) + if isObject == "list" { + return http.StatusBadRequest, in.createErrorMessage("List of JSON objects is not supported") + } + + objmap := make(map[string]interface{}) + err = json.Unmarshal(body, &objmap) + if err != nil { + return http.StatusBadRequest, in.createErrorMessage("Malformed JSON body") + } + + in.eventObject = &objmap + + return 0, "" +} + +// Ensure only valid HTTP Methods used +func (in *HttpEndpoint) validateMethod(w http.ResponseWriter, r *http.Request) (uint, string) { + if r.Method != http.MethodPost { + return http.StatusMethodNotAllowed, in.createErrorMessage("Only POST requests supported") + } + + return 0, "" +} + +func (in *HttpEndpoint) createErrorMessage(r string) string { + return fmt.Sprintf(`{"message": "%v"}`, r) +} + +func (in *HttpEndpoint) isObjectOrList(b []byte) string { + obj := bytes.TrimLeft(b, " \t\r\n") + if len(obj) > 0 && obj[0] == '{' { + return "object" + } + + if len(obj) > 0 && obj[0] == '[' { + return "list" + } + + return "" +} diff --git a/x-pack/filebeat/tests/system/test_http_endpoint.py b/x-pack/filebeat/tests/system/test_http_endpoint.py new file mode 100644 index 000000000000..4203918a693e --- /dev/null +++ b/x-pack/filebeat/tests/system/test_http_endpoint.py @@ -0,0 +1,218 @@ +import jinja2 +import requests +import sys +import os +import json +from requests.auth import HTTPBasicAuth + +sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../filebeat/tests/system')) + +from filebeat import BaseTest + + +class Test(BaseTest): + """ + Test filebeat with the http_endpoint input + """ + @classmethod + def setUpClass(self): + self.beat_name = "filebeat" + self.beat_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../")) + + super(BaseTest, self).setUpClass() + + def setUp(self): + super(BaseTest, self).setUp() + + # Hack to make jinja2 have the right paths + self.template_env = jinja2.Environment( + loader=jinja2.FileSystemLoader([ + os.path.abspath(os.path.join(self.beat_path, "../../filebeat")), + os.path.abspath(os.path.join(self.beat_path, "../../libbeat")) + ]) + ) + + def get_config(self, options=None): + """ + General function so that we do not have to define settings each time + """ + host = "127.0.0.1" + port = 8081 + input_raw = """ +- type: http_endpoint + enabled: true + listen_address: {} + listen_port: {} +""" + if options: + input_raw = '\n'.join([input_raw, options]) + self.beat_name = "filebeat" + self.beat_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../")) + + input_raw = input_raw.format(host, port) + self.render_config_template( + input_raw=input_raw, + inputs=False, + ) + self.host = host + self.port = port + self.prefix = 'testmessage' + self.url = "http://{}:{}/".format(host, port) + + def test_http_endpoint_request(self): + """ + Test http_endpoint input with HTTP events. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload)) + + self.wait_until(lambda: self.output_count(lambda x: x >= 1)) + filebeat.check_kill_and_wait() + + output = self.read_output() + + assert output[0]["input.type"] == "http_endpoint" + assert output[0]["json.{}".format(self.prefix)] == message + assert r.text == '{"message": "success"}' + + def test_http_endpoint_wrong_content_header(self): + """ + Test http_endpoint input with wrong content header. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/xml", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload)) + + filebeat.check_kill_and_wait() + + assert r.status_code == 415 + assert r.text == '{"message": "Wrong Content-Type header, expecting application/json"}' + + def test_http_endpoint_wrong_accept_header(self): + """ + Test http_endpoint input with wrong accept header. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/xml"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload)) + + filebeat.check_kill_and_wait() + + assert r.status_code == 406 + assert r.text == '{"message": "Wrong Accept header, expecting application/json"}' + + def test_http_endpoint_missing_auth_value(self): + """ + Test http_endpoint input with missing basic auth values. + """ + options = """ + basic_auth: true + username: testuser + password: +""" + self.get_config(options) + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=json.dumps( + payload), auth=HTTPBasicAuth('testuser', 'something')) + + filebeat.check_kill_and_wait() + + assert r.status_code == 401 + assert r.text == '{"message": "Username and password required when basicauth is enabled"}' + + def test_http_endpoint_wrong_auth_value(self): + """ + Test http_endpoint input with wrong basic auth values. + """ + options = """ + basic_auth: true + username: testuser + password: testpassword +""" + self.get_config(options) + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload), auth=HTTPBasicAuth('testuser', 'qwerty')) + + filebeat.check_kill_and_wait() + + assert r.status_code == 401 + assert r.text == '{"message": "Incorrect username or password"}' + + def test_http_endpoint_empty_body(self): + """ + Test http_endpoint input with empty body. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data="") + + filebeat.check_kill_and_wait() + + assert r.status_code == 406 + assert r.text == '{"message": "Body cannot be empty"}' + + def test_http_endpoint_malformed_json(self): + """ + Test http_endpoint input with malformed body. + """ + + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + payload = '{"message::":: "something"}' + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=payload) + + filebeat.check_kill_and_wait() + + assert r.status_code == 400 + assert r.text == '{"message": "Malformed JSON body"}' + + def test_http_endpoint_get_request(self): + """ + Test http_endpoint input with GET request. + """ + + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.get(self.url, headers=headers, data=json.dumps(payload)) + + filebeat.check_kill_and_wait() + + assert r.status_code == 405 + assert r.text == '{"message": "Only POST requests supported"}'