diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 49093b91964..be2b6c708e7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -412,6 +412,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Improve ECS categorization field mappings in osquery module. {issue}16176[16176] {pull}17881[17881] - Add support for v10, v11 and v12 logs on Postgres {issue}13810[13810] {pull}17732[17732] - Add dashboard for Google Cloud Audit and AWS CloudTrail. {pull}17379[17379] +- Added http_endpoint input{pull}18298[18298] - Add support for array parsing in azure-eventhub input. {pull}18585[18585] - The `logstash` module can now automatically detect the log file format (JSON or plaintext) and process it accordingly. {issue}9964[9964] {pull}18095[18095] - Improve ECS categorization field mappings in coredns module. {issue}16159[16159] {pull}18424[18424] diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc index 6f9a49d43fd..45658880854 100644 --- a/filebeat/docs/filebeat-options.asciidoc +++ b/filebeat/docs/filebeat-options.asciidoc @@ -47,6 +47,7 @@ You can configure {beatname_uc} to use the following inputs: * <<{beatname_lc}-input-container>> * <<{beatname_lc}-input-docker>> * <<{beatname_lc}-input-google-pubsub>> +* <<{beatname_lc}-input-http_endpoint>> * <<{beatname_lc}-input-httpjson>> * <<{beatname_lc}-input-kafka>> * <<{beatname_lc}-input-log>> @@ -73,6 +74,8 @@ include::inputs/input-docker.asciidoc[] include::../../x-pack/filebeat/docs/inputs/input-google-pubsub.asciidoc[] +include::../../x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc[] + include::../../x-pack/filebeat/docs/inputs/input-httpjson.asciidoc[] include::inputs/input-kafka.asciidoc[] diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc new file mode 100644 index 00000000000..2a949b01d26 --- /dev/null +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -0,0 +1,116 @@ +[role="xpack"] + +:type: http_endpoint + +[id="{beatname_lc}-input-{type}"] +=== HTTP Endpoint input + +++++ +HTTP Endpoint +++++ + +beta[] + +Use the `http_endpoint` input to create a HTTP listener that can receive incoming HTTP POST requests. + +This input can for example be used to receive incoming webhooks from a third-party application or service. + +Example configurations: + +Basic example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 +---- + +Custom response example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 + response_code: 200 + response_body: '{"message": "success"}' + url: "/" + prefix: "json" +---- + +Basic auth and SSL example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 + ssl.enabled: true + ssl.certificate: "/home/user/server.pem" + ssl.key: "/home/user/server.key" + ssl.verification_mode: "none" + ssl.certificate_authority: "/home/user/ca.pem" + basic_auth: true + username: someuser + password: somepassword +---- + + +==== Configuration options + +The `http_endpoint` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[float] +==== `basic_auth` + +Enables or disables HTTP basic auth for each incoming request. If enabled then `username` and `password` will also need to be configured. + +[float] +==== `username` + +If `basic_auth` is enabled, this is the username used for authentication against the HTTP listener. Requires `password` to also be set. + +[float] +==== `password` + +If `basic_auth` is eanbled, this is the password used for authentication against the HTTP listener. Requires `username` to also be set. + +[float] +==== `response_code` + +The HTTP response code returned upon success. Should be in the 2XX range. + +[float] +==== `response_body` + +The response body returned upon success. + +[float] +==== `listen_address` + +If multiple interfaces is present the `listen_address` can be set to control which IP address the listener binds to. Defaults to `127.0.0.1`. + +[float] +==== `listen_port` + +Which port the listener binds to. Defaults to 8000 + +[float] +==== `url` + +This options specific which URL path to accept requests on. Defaults to `/` + +[float] +==== `prefix` + +This option specifies which prefix the incoming request will be mapped to. + +[id="{beatname_lc}-input-{type}-common-options"] +include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] + +:type!: diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index 1633307c950..a5719d18fc7 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -11,6 +11,7 @@ import ( _ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/googlepubsub" + _ "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go new file mode 100644 index 00000000000..0626f5e2afd --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -0,0 +1,48 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "encoding/json" + "errors" + + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" +) + +// Config contains information about httpjson configuration +type config struct { + TLS *tlscommon.ServerConfig `config:"ssl"` + BasicAuth bool `config:"basic_auth"` + Username string `config:"username"` + Password string `config:"password"` + ResponseCode int `config:"response_code" validate:"positive"` + ResponseBody string `config:"response_body"` + ListenAddress string `config:"listen_address"` + ListenPort string `config:"listen_port"` + URL string `config:"url"` + Prefix string `config:"prefix"` +} + +func defaultConfig() config { + return config{ + BasicAuth: false, + Username: "", + Password: "", + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "8000", + URL: "/", + Prefix: "json", + } +} + +func (c *config) Validate() error { + if !json.Valid([]byte(c.ResponseBody)) { + return errors.New("response_body must be valid JSON") + } + + return nil +} diff --git a/x-pack/filebeat/input/http_endpoint/httpserver.go b/x-pack/filebeat/input/http_endpoint/httpserver.go new file mode 100644 index 00000000000..68325caaeb4 --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/httpserver.go @@ -0,0 +1,77 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "context" + "net/http" + "time" + + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type HttpServer struct { + log *logp.Logger + server *http.Server + ctx context.Context + stop context.CancelFunc +} + +func (h *HttpServer) Start() { + go func() { + if h.server.TLSConfig != nil { + h.log.Infof("Starting HTTPS server on %s", h.server.Addr) + //certificate is already loaded. That's why the parameters are empty + err := h.server.ListenAndServeTLS("", "") + if err != nil && err != http.ErrServerClosed { + h.log.Fatalf("Unable to start HTTPS server due to error: %v", err) + } + } else { + h.log.Infof("Starting HTTP server on %s", h.server.Addr) + err := h.server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + h.log.Fatalf("Unable to start HTTP server due to error: %v", err) + } + } + }() +} + +func (h *HttpServer) Stop() { + h.log.Info("Stopping HTTP server") + h.stop() + if err := h.server.Shutdown(h.ctx); err != nil { + h.log.Fatalf("Unable to stop HTTP server due to error: %v", err) + } +} + +func createServer(in *HttpEndpoint) (*HttpServer, error) { + mux := http.NewServeMux() + responseHandler := http.HandlerFunc(in.apiResponse) + mux.Handle(in.config.URL, in.validateRequest(responseHandler)) + server := &http.Server{ + Addr: in.config.ListenAddress + ":" + in.config.ListenPort, + Handler: mux, + } + + tlsConfig, err := tlscommon.LoadTLSServerConfig(in.config.TLS) + if err != nil { + return nil, err + } + + if tlsConfig != nil { + server.TLSConfig = tlsConfig.BuildModuleConfig(in.config.ListenAddress) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + h := &HttpServer{ + ctx: ctx, + stop: cancel, + log: logp.NewLogger("http_server"), + } + h.server = server + + return h, nil +} diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go new file mode 100644 index 00000000000..8c6211f8147 --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -0,0 +1,270 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/filebeat/channel" + "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +const ( + inputName = "http_endpoint" +) + +func init() { + err := input.Register(inputName, NewInput) + if err != nil { + panic(errors.Wrapf(err, "failed to register %v input", inputName)) + } +} + +type HttpEndpoint struct { + config + log *logp.Logger + outlet channel.Outleter // Output of received messages. + inputCtx context.Context // Wraps the Done channel from parent input.Context. + + workerCtx context.Context // Worker goroutine context. It's cancelled when the input stops or the worker exits. + workerCancel context.CancelFunc // Used to signal that the worker should stop. + workerOnce sync.Once // Guarantees that the worker goroutine is only started once. + workerWg sync.WaitGroup // Waits on worker goroutine. + server *HttpServer // Server instance + eventObject *map[string]interface{} // Current event object + finalHandler http.HandlerFunc +} + +// NewInput creates a new httpjson input +func NewInput( + cfg *common.Config, + connector channel.Connector, + inputContext input.Context, +) (input.Input, error) { + // Extract and validate the input's configuration. + conf := defaultConfig() + if err := cfg.Unpack(&conf); err != nil { + return nil, err + } + + // Build outlet for events. + out, err := connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: inputContext.DynamicFields, + }, + }) + if err != nil { + return nil, err + } + + // Wrap input.Context's Done channel with a context.Context. This goroutine + // stops with the parent closes the Done channel. + inputCtx, cancelInputCtx := context.WithCancel(context.Background()) + go func() { + defer cancelInputCtx() + select { + case <-inputContext.Done: + case <-inputCtx.Done(): + } + }() + + // If the input ever needs to be made restartable, then context would need + // to be recreated with each restart. + workerCtx, workerCancel := context.WithCancel(inputCtx) + + in := &HttpEndpoint{ + config: conf, + log: logp.NewLogger(inputName), + outlet: out, + inputCtx: inputCtx, + workerCtx: workerCtx, + workerCancel: workerCancel, + } + + // Create an instance of the HTTP server with the beat context + in.server, err = createServer(in) + if err != nil { + return nil, err + } + + in.log.Infof("Initialized %v input on %v:%v", inputName, in.config.ListenAddress, in.config.ListenPort) + + return in, nil +} + +// Run starts the input worker then returns. Only the first invocation +// will ever start the worker. +func (in *HttpEndpoint) Run() { + in.workerOnce.Do(func() { + in.workerWg.Add(1) + go in.run() + }) +} + +func (in *HttpEndpoint) run() { + defer in.workerWg.Done() + defer in.log.Infof("%v worker has stopped.", inputName) + in.server.Start() +} + +// Stops HTTP input and waits for it to finish +func (in *HttpEndpoint) Stop() { + in.workerCancel() + in.workerWg.Wait() +} + +// Wait is an alias for Stop. +func (in *HttpEndpoint) Wait() { + in.Stop() +} + +// If middleware validation successed, event is sent +func (in *HttpEndpoint) sendEvent(w http.ResponseWriter, r *http.Request) { + event := in.outlet.OnEvent(beat.Event{ + Timestamp: time.Now().UTC(), + Fields: common.MapStr{ + in.config.Prefix: in.eventObject, + }, + }) + if !event { + in.sendResponse(w, http.StatusInternalServerError, in.createErrorMessage("Unable to send event")) + } +} + +// Triggers if middleware validation returns successful +func (in *HttpEndpoint) apiResponse(w http.ResponseWriter, r *http.Request) { + in.sendEvent(w, r) + w.Header().Add("Content-Type", "application/json") + in.sendResponse(w, uint(in.config.ResponseCode), in.config.ResponseBody) +} + +func (in *HttpEndpoint) sendResponse(w http.ResponseWriter, h uint, b string) { + w.WriteHeader(int(h)) + w.Write([]byte(b)) +} + +// Runs all validations for each request +func (in *HttpEndpoint) validateRequest(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if in.config.BasicAuth { + status, err := in.validateAuth(w, r) + if err != "" && status != 0 { + in.sendResponse(w, status, err) + return + } + } + + status, err := in.validateMethod(w, r) + if err != "" && status != 0 { + in.sendResponse(w, status, err) + return + } + + status, err = in.validateHeader(w, r) + if err != "" && status != 0 { + in.sendResponse(w, status, err) + return + } + + status, err = in.validateBody(w, r) + if err != "" && status != 0 { + in.sendResponse(w, status, err) + return + } + + next.ServeHTTP(w, r) + }) +} + +// Validate that only supported Accept and Content type headers are used +func (in *HttpEndpoint) validateHeader(w http.ResponseWriter, r *http.Request) (uint, string) { + if r.Header.Get("Content-Type") != "application/json" { + return http.StatusUnsupportedMediaType, in.createErrorMessage("Wrong Content-Type header, expecting application/json") + } + + if r.Header.Get("Accept") != "application/json" { + return http.StatusNotAcceptable, in.createErrorMessage("Wrong Accept header, expecting application/json") + } + return 0, "" +} + +// Validate if headers are current and authentication is successful +func (in *HttpEndpoint) validateAuth(w http.ResponseWriter, r *http.Request) (uint, string) { + if in.config.Username == "" || in.config.Password == "" { + return http.StatusUnauthorized, in.createErrorMessage("Username and password required when basicauth is enabled") + } + + username, password, _ := r.BasicAuth() + if in.config.Username != username || in.config.Password != password { + return http.StatusUnauthorized, in.createErrorMessage("Incorrect username or password") + } + + return 0, "" +} + +// Validates that body is not empty, not a list of objects and valid JSON +func (in *HttpEndpoint) validateBody(w http.ResponseWriter, r *http.Request) (uint, string) { + if r.Body == http.NoBody { + return http.StatusNotAcceptable, in.createErrorMessage("Body cannot be empty") + } + + body, err := ioutil.ReadAll(r.Body) + if err != nil { + return http.StatusInternalServerError, in.createErrorMessage("Unable to read body") + } + + isObject := in.isObjectOrList(body) + if isObject == "list" { + return http.StatusBadRequest, in.createErrorMessage("List of JSON objects is not supported") + } + + objmap := make(map[string]interface{}) + err = json.Unmarshal(body, &objmap) + if err != nil { + return http.StatusBadRequest, in.createErrorMessage("Malformed JSON body") + } + + in.eventObject = &objmap + + return 0, "" +} + +// Ensure only valid HTTP Methods used +func (in *HttpEndpoint) validateMethod(w http.ResponseWriter, r *http.Request) (uint, string) { + if r.Method != http.MethodPost { + return http.StatusMethodNotAllowed, in.createErrorMessage("Only POST requests supported") + } + + return 0, "" +} + +func (in *HttpEndpoint) createErrorMessage(r string) string { + return fmt.Sprintf(`{"message": "%v"}`, r) +} + +func (in *HttpEndpoint) isObjectOrList(b []byte) string { + obj := bytes.TrimLeft(b, " \t\r\n") + if len(obj) > 0 && obj[0] == '{' { + return "object" + } + + if len(obj) > 0 && obj[0] == '[' { + return "list" + } + + return "" +} diff --git a/x-pack/filebeat/tests/system/test_http_endpoint.py b/x-pack/filebeat/tests/system/test_http_endpoint.py new file mode 100644 index 00000000000..4203918a693 --- /dev/null +++ b/x-pack/filebeat/tests/system/test_http_endpoint.py @@ -0,0 +1,218 @@ +import jinja2 +import requests +import sys +import os +import json +from requests.auth import HTTPBasicAuth + +sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../filebeat/tests/system')) + +from filebeat import BaseTest + + +class Test(BaseTest): + """ + Test filebeat with the http_endpoint input + """ + @classmethod + def setUpClass(self): + self.beat_name = "filebeat" + self.beat_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../")) + + super(BaseTest, self).setUpClass() + + def setUp(self): + super(BaseTest, self).setUp() + + # Hack to make jinja2 have the right paths + self.template_env = jinja2.Environment( + loader=jinja2.FileSystemLoader([ + os.path.abspath(os.path.join(self.beat_path, "../../filebeat")), + os.path.abspath(os.path.join(self.beat_path, "../../libbeat")) + ]) + ) + + def get_config(self, options=None): + """ + General function so that we do not have to define settings each time + """ + host = "127.0.0.1" + port = 8081 + input_raw = """ +- type: http_endpoint + enabled: true + listen_address: {} + listen_port: {} +""" + if options: + input_raw = '\n'.join([input_raw, options]) + self.beat_name = "filebeat" + self.beat_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../")) + + input_raw = input_raw.format(host, port) + self.render_config_template( + input_raw=input_raw, + inputs=False, + ) + self.host = host + self.port = port + self.prefix = 'testmessage' + self.url = "http://{}:{}/".format(host, port) + + def test_http_endpoint_request(self): + """ + Test http_endpoint input with HTTP events. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload)) + + self.wait_until(lambda: self.output_count(lambda x: x >= 1)) + filebeat.check_kill_and_wait() + + output = self.read_output() + + assert output[0]["input.type"] == "http_endpoint" + assert output[0]["json.{}".format(self.prefix)] == message + assert r.text == '{"message": "success"}' + + def test_http_endpoint_wrong_content_header(self): + """ + Test http_endpoint input with wrong content header. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/xml", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload)) + + filebeat.check_kill_and_wait() + + assert r.status_code == 415 + assert r.text == '{"message": "Wrong Content-Type header, expecting application/json"}' + + def test_http_endpoint_wrong_accept_header(self): + """ + Test http_endpoint input with wrong accept header. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/xml"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload)) + + filebeat.check_kill_and_wait() + + assert r.status_code == 406 + assert r.text == '{"message": "Wrong Accept header, expecting application/json"}' + + def test_http_endpoint_missing_auth_value(self): + """ + Test http_endpoint input with missing basic auth values. + """ + options = """ + basic_auth: true + username: testuser + password: +""" + self.get_config(options) + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=json.dumps( + payload), auth=HTTPBasicAuth('testuser', 'something')) + + filebeat.check_kill_and_wait() + + assert r.status_code == 401 + assert r.text == '{"message": "Username and password required when basicauth is enabled"}' + + def test_http_endpoint_wrong_auth_value(self): + """ + Test http_endpoint input with wrong basic auth values. + """ + options = """ + basic_auth: true + username: testuser + password: testpassword +""" + self.get_config(options) + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload), auth=HTTPBasicAuth('testuser', 'qwerty')) + + filebeat.check_kill_and_wait() + + assert r.status_code == 401 + assert r.text == '{"message": "Incorrect username or password"}' + + def test_http_endpoint_empty_body(self): + """ + Test http_endpoint input with empty body. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data="") + + filebeat.check_kill_and_wait() + + assert r.status_code == 406 + assert r.text == '{"message": "Body cannot be empty"}' + + def test_http_endpoint_malformed_json(self): + """ + Test http_endpoint input with malformed body. + """ + + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + payload = '{"message::":: "something"}' + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=payload) + + filebeat.check_kill_and_wait() + + assert r.status_code == 400 + assert r.text == '{"message": "Malformed JSON body"}' + + def test_http_endpoint_get_request(self): + """ + Test http_endpoint input with GET request. + """ + + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.get(self.url, headers=headers, data=json.dumps(payload)) + + filebeat.check_kill_and_wait() + + assert r.status_code == 405 + assert r.text == '{"message": "Only POST requests supported"}'