Skip to content

Commit

Permalink
Improve error when ES Ingest node plugins are not loaded (#3676)
Browse files Browse the repository at this point in the history
* Improve error when ES ingest node plugins are not loaded

We're parsing the Elasticsearch JSON error and try to produce an
error message that is as helpful as possible. The following cases
are detected:

* A plugin providing a processor is missing. In case the plugin is one of
  `ingest-geoip` or `ingest-user-agent`, we can also suggest the command that
  installs them.
* Elasticsearch < 5.0. We now detect this and tell the user that ES 5.0 is
  required by FBM.

A drawback of this approach is that if both the GeoIP and User-Agent plugins
are missing, only one will be reported. This might get solved by including the
user-agent one in ES, or by improving the error we get from ES, or by us querying
the node stats API

Note: this contains a change in the ES client, which makes it return the body
in case of errors. I think we need that part anyway, otherwise we often show
errors like `400 Bad request` without any other details. I tried to do a minimal
change there, I hope I didn't introduce any changes in behaviour.

* Move error after []byte in the returned values

* addressed comments and added more tests
  • Loading branch information
tsg authored and Steffen Siering committed Mar 1, 2017
1 parent 5da7872 commit 96bb79b
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 18 deletions.
72 changes: 69 additions & 3 deletions filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package fileset

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -245,7 +247,7 @@ func (reg *ModuleRegistry) GetProspectorConfigs() ([]*common.Config, error) {
// PipelineLoader is a subset of the Elasticsearch client API capable of loading
// the pipelines.
type PipelineLoader interface {
LoadJSON(path string, json map[string]interface{}) error
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
}

Expand Down Expand Up @@ -273,14 +275,78 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string
logp.Debug("modules", "Pipeline %s already loaded", pipelineID)
return nil
}
err := esClient.LoadJSON(path, content)
body, err := esClient.LoadJSON(path, content)
if err != nil {
return fmt.Errorf("couldn't load template: %v", err)
return interpretError(err, body)
}
logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID)
return nil
}

func interpretError(initialErr error, body []byte) error {
var response struct {
Error struct {
RootCause []struct {
Type string `json:"type"`
Reason string `json:"reason"`
Header struct {
ProcessorType string `json:"processor_type"`
} `json:"header"`
Index string `json:"index"`
} `json:"root_cause"`
} `json:"error"`
}
err := json.Unmarshal(body, &response)
if err != nil {
// this might be ES < 2.0. Do a best effort to check for ES 1.x
var response1x struct {
Error string `json:"error"`
}
err1x := json.Unmarshal(body, &response1x)
if err1x == nil && response1x.Error != "" {
return fmt.Errorf("The Filebeat modules require Elasticsearch >= 5.0. "+
"This is the response I got from Elasticsearch: %s", body)
}

return fmt.Errorf("couldn't load pipeline: %v. Additionally, error decoding response body: %s",
initialErr, body)
}

// missing plugins?
if len(response.Error.RootCause) > 0 &&
response.Error.RootCause[0].Type == "parse_exception" &&
strings.HasPrefix(response.Error.RootCause[0].Reason, "No processor type exists with name") &&
response.Error.RootCause[0].Header.ProcessorType != "" {

plugins := map[string]string{
"geoip": "ingest-geoip",
"user_agent": "ingest-user-agent",
}
plugin, ok := plugins[response.Error.RootCause[0].Header.ProcessorType]
if !ok {
return fmt.Errorf("This module requires an Elasticsearch plugin that provides the %s processor. "+
"Please visit the Elasticsearch documentation for instructions on how to install this plugin. "+
"Response body: %s", response.Error.RootCause[0].Header.ProcessorType, body)
}

return fmt.Errorf("This module requires the %s plugin to be installed in Elasticsearch. "+
"You can installing using the following command in the Elasticsearch home directory:\n"+
" sudo bin/elasticsearch-plugin install %s", plugin, plugin)
}

// older ES version?
if len(response.Error.RootCause) > 0 &&
response.Error.RootCause[0].Type == "invalid_index_name_exception" &&
response.Error.RootCause[0].Index == "_ingest" {

return fmt.Errorf("The Ingest Node functionality seems to be missing from Elasticsearch. "+
"The Filebeat modules require Elasticsearch >= 5.0. "+
"This is the response I got from Elasticsearch: %s", body)
}

return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body)
}

func (reg *ModuleRegistry) Empty() bool {
count := 0
for _, filesets := range reg.registry {
Expand Down
53 changes: 53 additions & 0 deletions filebeat/fileset/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package fileset

import (
"errors"
"fmt"
"path/filepath"
"testing"
Expand Down Expand Up @@ -344,3 +345,55 @@ func TestMissingModuleFolder(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 0, len(prospectors))
}

func TestInterpretError(t *testing.T) {
tests := []struct {
Test string
Input string
Output string
}{
{
Test: "geoip not installed",
Input: `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [geoip]","header":{"processor_type":"geoip"}}],"type":"parse_exception","reason":"No processor type exists with name [geoip]","header":{"processor_type":"geoip"}},"status":400}`,
Output: "This module requires the ingest-geoip plugin to be installed in Elasticsearch. You can installing using the following command in the Elasticsearch home directory:\n sudo bin/elasticsearch-plugin install ingest-geoip",
},
{
Test: "user-agent not installed",
Input: `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [user_agent]","header":{"processor_type":"user_agent"}}],"type":"parse_exception","reason":"No processor type exists with name [user_agent]","header":{"processor_type":"user_agent"}},"status":400}`,
Output: "This module requires the ingest-user-agent plugin to be installed in Elasticsearch. You can installing using the following command in the Elasticsearch home directory:\n sudo bin/elasticsearch-plugin install ingest-user-agent",
},
{
Test: "other plugin not installed",
Input: `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}}],"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}},"status":400}`,
Output: "This module requires an Elasticsearch plugin that provides the hello_test processor. " +
"Please visit the Elasticsearch documentation for instructions on how to install this plugin. " +
"Response body: " + `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}}],"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}},"status":400}`,
},
{
Test: "Elasticsearch 2.4",
Input: `{"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"}],"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"},"status":400}`,
Output: `The Ingest Node functionality seems to be missing from Elasticsearch. The Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"}],"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"},"status":400}`,
},
{
Test: "Elasticsearch 1.7",
Input: `{"error":"InvalidIndexNameException[[_ingest] Invalid index name [_ingest], must not start with '_']","status":400}`,
Output: `The Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":"InvalidIndexNameException[[_ingest] Invalid index name [_ingest], must not start with '_']","status":400}`,
},
{
Test: "bad json",
Input: `blah`,
Output: `couldn't load pipeline: test. Additionally, error decoding response body: blah`,
},
{
Test: "another error",
Input: `{"error":{"root_cause":[{"type":"test","reason":""}],"type":"test","reason":""},"status":400}`,
Output: "couldn't load pipeline: test. Response body: " +
`{"error":{"root_cause":[{"type":"test","reason":""}],"type":"test","reason":""},"status":400}`,
},
}

for _, test := range tests {
errResult := interpretError(errors.New("test"), []byte(test.Input))
assert.Equal(t, errResult.Error(), test.Output, test.Test)
}
}
2 changes: 1 addition & 1 deletion libbeat/dashboards/dashboards/dashboards.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// DashboardLoader is a subset of the Elasticsearch client API capable of
// loading the dashboards.
type DashboardLoader interface {
LoadJSON(path string, json map[string]interface{}) error
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
CreateIndex(index string, body interface{}) (int, *elasticsearch.QueryResult, error)
}

Expand Down
8 changes: 4 additions & 4 deletions libbeat/dashboards/dashboards/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func (imp Importer) ImportJSONFile(fileType string, file string) error {
json.Unmarshal(reader, &jsonContent)
fileBase := strings.TrimSuffix(filepath.Base(file), filepath.Ext(file))

err = imp.client.LoadJSON(path+"/"+fileBase, jsonContent)
body, err := imp.client.LoadJSON(path+"/"+fileBase, jsonContent)
if err != nil {
return fmt.Errorf("Failed to load %s under %s/%s: %s", file, path, fileBase, err)
return fmt.Errorf("Failed to load %s under %s/%s: %s. Response body: %s", file, path, fileBase, err, body)
}

return nil
Expand Down Expand Up @@ -271,7 +271,7 @@ func (imp Importer) ImportSearch(file string) error {
path := "/" + imp.cfg.KibanaIndex + "/search/" + searchName
imp.statusMsg("Import search %s", file)

if err = imp.client.LoadJSON(path, searchContent); err != nil {
if _, err = imp.client.LoadJSON(path, searchContent); err != nil {
return err
}

Expand Down Expand Up @@ -301,7 +301,7 @@ func (imp Importer) ImportIndex(file string) error {
path := "/" + imp.cfg.KibanaIndex + "/index-pattern/" + indexName
imp.statusMsg("Import index to %s from %s\n", path, file)

if err = imp.client.LoadJSON(path, indexContent); err != nil {
if _, err = imp.client.LoadJSON(path, indexContent); err != nil {
return err
}
return nil
Expand Down
21 changes: 11 additions & 10 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,24 +584,24 @@ func (client *Client) PublishEvent(data outputs.Data) error {
func (client *Client) LoadTemplate(templateName string, template map[string]interface{}) error {

path := "/_template/" + templateName
err := client.LoadJSON(path, template)
body, err := client.LoadJSON(path, template)
if err != nil {
return fmt.Errorf("couldn't load template: %v", err)
return fmt.Errorf("couldn't load template: %v. Response body: %s", err, body)
}
logp.Info("Elasticsearch template with name '%s' loaded", templateName)
return nil
}

func (client *Client) LoadJSON(path string, json map[string]interface{}) error {
status, _, err := client.Request("PUT", path, "", nil, json)
func (client *Client) LoadJSON(path string, json map[string]interface{}) ([]byte, error) {
status, body, err := client.Request("PUT", path, "", nil, json)
if err != nil {
return fmt.Errorf("couldn't load json. Error: %s", err)
return body, fmt.Errorf("couldn't load json. Error: %s", err)
}
if status > 300 {
return fmt.Errorf("couldn't load json. Status: %v", status)
return body, fmt.Errorf("couldn't load json. Status: %v", status)
}

return nil
return body, nil
}

// CheckTemplate checks if a given template already exist. It returns true if
Expand Down Expand Up @@ -718,15 +718,16 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error)
defer closing(resp.Body)

status := resp.StatusCode
var retErr error
if status >= 300 {
return status, nil, fmt.Errorf("%v", resp.Status)
retErr = fmt.Errorf("%v", resp.Status)
}

obj, err := ioutil.ReadAll(resp.Body)
if err != nil {
return status, nil, err
return status, nil, retErr
}
return status, obj, nil
return status, obj, retErr
}

func closing(c io.Closer) {
Expand Down

0 comments on commit 96bb79b

Please sign in to comment.