diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index ee5bc4948b9..10d87190c57 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -547,6 +547,7 @@ https://github.com/elastic/beats/compare/v6.0.1...v6.1.0[View commits] - Add experimental Docker autodiscover functionality. {pull}5245[5245] - Add option to convert the timestamps to UTC in the system module. {pull}5647[5647] - Add Logstash module support for main log and the slow log, support the plain text or structured JSON format {pull}5481[5481] +- Support stored scripts {pull}5339[5339] *Metricbeat* diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index 8992b29a474..256805be648 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -23,13 +23,14 @@ import ( // Fileset struct is the representation of a fileset. type Fileset struct { - name string - mcfg *ModuleConfig - fcfg *FilesetConfig - modulePath string - manifest *manifest - vars map[string]interface{} - pipelineID string + name string + mcfg *ModuleConfig + fcfg *FilesetConfig + modulePath string + manifest *manifest + vars map[string]interface{} + pipelineID string + scriptIDTemplate *template.Template } // New allocates a new Fileset object with the given configuration. @@ -75,6 +76,11 @@ func (fs *Fileset) Read(beatVersion string) error { return err } + // if pipeline scripts are available generate a template for its ids + if len(fs.manifest.PipelineScripts) > 0 { + fs.scriptIDTemplate = fs.getScriptIDTemplate(beatVersion) + } + return nil } @@ -95,6 +101,7 @@ type manifest struct { Requires struct { Processors []ProcessorRequirement `config:"processors"` } `config:"requires"` + PipelineScripts []string `config:"pipeline_script"` } func newManifest(cfg *common.Config) (*manifest, error) { @@ -330,6 +337,37 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) { return cfg, nil } +// GetScriptsToStore returns a map of scripts under ingest/script folder of fileset +// Keys of the returned map are the filename and the values are the source code of the script +func (fs *Fileset) GetScriptsToStore() (map[string]string, error) { + // return none if no scripts are available for the fileset + if len(fs.manifest.PipelineScripts) == 0 { + return nil, nil + } + + // read all scripts into a map + folder := filepath.Join(fs.modulePath, fs.name, "ingest", "script") + scripts := make(map[string]string) + for _, name := range fs.manifest.PipelineScripts { + scriptPath := filepath.Join(folder, name) + + var source []byte + source, err := ioutil.ReadFile(scriptPath) + if err != nil { + return nil, fmt.Errorf("Error while reading script %s for fileset %s: %v", name, fs.name, err) + } + scripts[name] = string(source[:]) + } + + return scripts, nil +} + +// getScriptIDTemplate returns the Ingest Node script ID template +func (fs *Fileset) getScriptIDTemplate(beatVersion string) *template.Template { + tStr := formatScriptIDTemplate(fs.mcfg.Module, fs.name, beatVersion) + return template.Must(template.New("script_id").Parse(tStr)) +} + // getPipelineID returns the Ingest Node pipeline ID func (fs *Fileset) getPipelineID(beatVersion string) (string, error) { path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false) @@ -362,13 +400,49 @@ func (fs *Fileset) GetPipeline(esVersion string) (pipelineID string, content map return "", nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err) } + for _, name := range fs.manifest.PipelineScripts { + jsonString, err = substituteScriptIDs(jsonString, name, fs.scriptIDTemplate) + if err != nil { + return "", nil, err + } + + } + err = json.Unmarshal([]byte(jsonString), &content) if err != nil { return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err) } + return fs.pipelineID, content, nil } +func substituteScriptIDs(jsonString, name string, t *template.Template) (string, error) { + p := strings.Split(name, ".") + if len(p) != 2 { + return "", fmt.Errorf("Error substituting script ids: invalid script name.") + } + + scriptPipelinePattern := "\"script\": {\n \"id\": \"%s\"" + scriptElem := fmt.Sprintf(scriptPipelinePattern, p[0]) + + scriptID := bytes.NewBufferString("") + err := t.Execute(scriptID, p[0]) + if err != nil { + return "", err + } + + scriptElemFull := fmt.Sprintf(scriptPipelinePattern, scriptID.String()) + jsonString = strings.Replace(jsonString, scriptElem, scriptElemFull, -1) + + return jsonString, nil + +} + +// formatScriptIDTemplate generates the ID to be used for the pipeline script ID in Elasticsearch +func formatScriptIDTemplate(module, fileset, beatVersion string) string { + return fmt.Sprintf("filebeat-%s-%s-%s-{{.}}", beatVersion, module, fileset) +} + // formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch func formatPipelineID(module, fileset, path, beatVersion string) string { return fmt.Sprintf("filebeat-%s-%s-%s-%s", beatVersion, module, fileset, removeExt(filepath.Base(path))) diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 55355218d9d..e2b7f2a5f55 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -31,6 +31,7 @@ func TestLoadManifestNginx(t *testing.T) { assert.Equal(t, manifest.ModuleVersion, "1.0") assert.Equal(t, manifest.IngestPipeline, "ingest/default.json") assert.Equal(t, manifest.Input, "config/nginx-access.yml") + assert.Equal(t, manifest.PipelineScripts, []string{"is-private-ip.painless"}) vars := manifest.Vars assert.Equal(t, "paths", vars[0]["name"]) diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index 35fcc225107..43e0dbfc099 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -1,6 +1,7 @@ package fileset import ( + "bytes" "encoding/json" "fmt" "io/ioutil" @@ -257,6 +258,105 @@ func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) { return result, nil } +// PipelineLoader factory builds and returns a PipelineLoader +type PipelineLoaderFactory func() (PipelineLoader, error) + +// PipelineLoader is a subset of the Elasticsearch client API capable of loading +// the pipelines. +type PipelineLoader interface { + LoadJSON(path string, json map[string]interface{}) ([]byte, error) + Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) + GetVersion() string +} + +// LoadPipelines loads the pipelines for each configured fileset. +func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader) error { + for module, filesets := range reg.registry { + for name, fileset := range filesets { + // check that all the required Ingest Node plugins are available + requiredProcessors := fileset.GetRequiredProcessors() + logp.Debug("modules", "Required processors: %s", requiredProcessors) + if len(requiredProcessors) > 0 { + err := checkAvailableProcessors(esClient, requiredProcessors) + if err != nil { + return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + } + } + + scripts, err := fileset.GetScriptsToStore() + if err != nil { + return fmt.Errorf("Error getting scripts for fileset %s/%s: %v", module, name, err) + } + err = loadScripts(esClient, fileset, scripts) + if err != nil { + return fmt.Errorf("Error loading scripts for fileset %s/%s: %v", module, name, err) + } + + pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion()) + if err != nil { + return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) + } + err = loadPipeline(esClient, pipelineID, content) + if err != nil { + return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + } + } + } + return nil +} + +func scriptPayload(fs *Fileset, name, source string) (common.MapStr, string, error) { + parts := strings.Split(name, ".") + if len(parts) != 2 { + return nil, "", fmt.Errorf("Invalid number of filename parts: %d (instead of 2)", len(parts)) + } + + if parts[1] != "painless" { + return nil, "", fmt.Errorf("Only painless scripts can be stored for pipelines") + } + + scriptID := bytes.NewBufferString("") + err := fs.scriptIDTemplate.Execute(scriptID, parts[0]) + if err != nil { + return nil, "", fmt.Errorf("error while generating id") + } + + url := strings.Join([]string{"/_scripts/", scriptID.String()}, "") + return common.MapStr{ + "script": common.MapStr{ + "lang": "painless", + "source": source, + }, + }, url, nil +} + +func loadScript(esClient PipelineLoader, fs *Fileset, name, source string) error { + p, url, err := scriptPayload(fs, name, source) + if err != nil { + return fmt.Errorf("Error adding script: %v", err) + } + + status, body, err := esClient.Request("POST", url, "", nil, p) + if err != nil { + return fmt.Errorf("Error adding script: %v", err) + } + if status > 299 { + return fmt.Errorf("Error adding script. Status: %d. Response body: %s", status, body) + } + logp.Info("Loaded script: %v %v", name, url) + return nil +} + +func loadScripts(esClient PipelineLoader, fs *Fileset, scripts map[string]string) error { + for name, source := range scripts { + err := loadScript(esClient, fs, name, source) + if err != nil { + return err + } + } + return nil +} + // InfoString returns the enabled modules and filesets in a single string, ready to // be shown to the user func (reg *ModuleRegistry) InfoString() string { diff --git a/filebeat/module/auditd/log/ingest/pipeline.json b/filebeat/module/auditd/log/ingest/pipeline.json index e2a97600389..05ca910d60c 100644 --- a/filebeat/module/auditd/log/ingest/pipeline.json +++ b/filebeat/module/auditd/log/ingest/pipeline.json @@ -78,8 +78,7 @@ }, { "script": { - "lang": "painless", - "inline": " String trimQuotes(def v) {\n if (v.startsWith(\"'\") || v.startsWith('\"')) {\n v = v.substring(1, v.length());\n }\n if (v.endsWith(\"'\") || v.endsWith('\"')) {\n v = v.substring(0, v.length()-1);\n } \n return v;\n }\n \n boolean isHexAscii(String v) {\n def len = v.length();\n if (len == 0 || len % 2 != 0) {\n return false; \n }\n \n for (int i = 0 ; i < len ; i++) {\n if (Character.digit(v.charAt(i), 16) == -1) {\n return false;\n }\n }\n\n return true;\n }\n \n String convertHexToString(String hex) {\n\t StringBuilder sb = new StringBuilder();\n\n for (int i=0; i < hex.length() - 1; i+=2) {\n String output = hex.substring(i, (i + 2));\n int decimal = Integer.parseInt(output, 16);\n sb.append((char)decimal);\n }\n\n return sb.toString();\n }\n \n def possibleHexKeys = ['exe', 'cmd'];\n \n def audit = ctx.auditd.get(\"log\");\n Iterator entries = audit.entrySet().iterator();\n while (entries.hasNext()) {\n def e = entries.next();\n def k = e.getKey();\n def v = e.getValue(); \n\n // Remove entries whose value is ?\n if (v == \"?\" || v == \"(null)\" || v == \"\") {\n entries.remove();\n continue;\n }\n \n // Convert hex values to ASCII.\n if (possibleHexKeys.contains(k) && isHexAscii(v)) {\n v = convertHexToString(v);\n audit.put(k, v);\n }\n \n // Trim quotes.\n if (v instanceof String) {\n v = trimQuotes(v);\n audit.put(k, v);\n }\n \n // Convert arch.\n if (k == \"arch\" && v == \"c000003e\") {\n audit.put(k, \"x86_64\");\n }\n }" + "id": "trim-quotes" } }, { diff --git a/filebeat/module/auditd/log/ingest/script/trim-quotes.painless b/filebeat/module/auditd/log/ingest/script/trim-quotes.painless new file mode 100644 index 00000000000..d86c4bdfeba --- /dev/null +++ b/filebeat/module/auditd/log/ingest/script/trim-quotes.painless @@ -0,0 +1,69 @@ +String trimQuotes(def v) { + if (v.startsWith(\"'\") || v.startsWith('\"')) { + v = v.substring(1, v.length()); + } + if (v.endsWith(\"'\") || v.endsWith('\"')) { + v = v.substring(0, v.length()-1); + } + return v; +} + +boolean isHexAscii(String v) { + def len = v.length(); + if (len == 0 || len % 2 != 0) { + return false; + } + + for (int i = 0 ; i < len ; i++) { + if (Character.digit(v.charAt(i), 16) == -1) { + return false; + } + } + + return true; +} + +String convertHexToString(String hex) { + StringBuilder sb = new StringBuilder(); + + for (int i=0; i < hex.length() - 1; i+=2) { + String output = hex.substring(i, (i + 2)); + int decimal = Integer.parseInt(output, 16); + sb.append((char)decimal); + } + + return sb.toString(); +} + +def possibleHexKeys = ['exe', 'cmd']; + +def audit = ctx.auditd.get(\"log\"); +Iterator entries = audit.entrySet().iterator(); +while (entries.hasNext()) { + def e = entries.next(); + def k = e.getKey(); + def v = e.getValue(); + + // Remove entries whose value is ? + if (v == \"?\" || v == \"(null)\" || v == \"\") { + entries.remove(); + continue; + } + + // Convert hex values to ASCII. + if (possibleHexKeys.contains(k) && isHexAscii(v)) { + v = convertHexToString(v); + audit.put(k, v); + } + + // Trim quotes. + if (v instanceof String) { + v = trimQuotes(v); + audit.put(k, v); + } + + // Convert arch. + if (k == \"arch\" && v == \"c000003e\") { + audit.put(k, \"x86_64\"); + } +} diff --git a/filebeat/module/auditd/log/manifest.yml b/filebeat/module/auditd/log/manifest.yml index 99ff50e73ac..4218ca3c3f7 100644 --- a/filebeat/module/auditd/log/manifest.yml +++ b/filebeat/module/auditd/log/manifest.yml @@ -10,6 +10,9 @@ var: ingest_pipeline: ingest/pipeline.json input: config/log.yml +pipeline_script: +- "trim-quotes.painless" + requires.processors: - name: geoip plugin: ingest-geoip diff --git a/filebeat/module/nginx/access/ingest/default.json b/filebeat/module/nginx/access/ingest/default.json index 04ae1197e69..dc3f060fb76 100644 --- a/filebeat/module/nginx/access/ingest/default.json +++ b/filebeat/module/nginx/access/ingest/default.json @@ -31,8 +31,7 @@ } }, { "script": { - "lang": "painless", - "inline": "boolean isPrivate(def ip) { try { StringTokenizer tok = new StringTokenizer(ip, '.'); int firstByte = Integer.parseInt(tok.nextToken()); int secondByte = Integer.parseInt(tok.nextToken()); if (firstByte == 10) { return true; } if (firstByte == 192 && secondByte == 168) { return true; } if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) { return true; } if (firstByte == 127) { return true; } return false; } catch (Exception e) { return false; } } def found = false; for (def item : ctx.nginx.access.remote_ip_list) { if (!isPrivate(item)) { ctx.nginx.access.remote_ip = item; found = true; break; } } if (!found) { ctx.nginx.access.remote_ip = ctx.nginx.access.remote_ip_list[0]; }" + "id": "is-private-ip" } }, { "remove":{ diff --git a/filebeat/module/nginx/access/ingest/script/is-private-ip.painless b/filebeat/module/nginx/access/ingest/script/is-private-ip.painless new file mode 100644 index 00000000000..fb675240a4c --- /dev/null +++ b/filebeat/module/nginx/access/ingest/script/is-private-ip.painless @@ -0,0 +1,34 @@ +boolean isPrivate(def ip) { + try { + StringTokenizer tok = new StringTokenizer(ip, '.'); + int firstByte = Integer.parseInt(tok.nextToken()); + int secondByte = Integer.parseInt(tok.nextToken()); + if (firstByte == 10) { + return true; + } + if (firstByte == 192 && secondByte == 168) { + return true; + } + if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) { + return true; + } + if (firstByte == 127) { + return true; + } + return false; + } catch (Exception e) { + return false; + } +} + +def found = false; +for (def item : ctx.nginx.access.remote_ip_list) { + if (!isPrivate(item)) { + ctx.nginx.access.remote_ip = item; + found = true; + break; + } +} +if (!found) { + ctx.nginx.access.remote_ip = ctx.nginx.access.remote_ip_list[0]; +} diff --git a/filebeat/module/nginx/access/manifest.yml b/filebeat/module/nginx/access/manifest.yml index a0fede4ed51..349d814e073 100644 --- a/filebeat/module/nginx/access/manifest.yml +++ b/filebeat/module/nginx/access/manifest.yml @@ -34,6 +34,9 @@ machine_learning: datafeed: machine_learning/datafeed_visitor_rate.json min_version: 5.5.0 +pipeline_script: +- is-private-ip.painless + requires.processors: - name: user_agent plugin: ingest-user-agent diff --git a/filebeat/module/redis/log/ingest/pipeline.json b/filebeat/module/redis/log/ingest/pipeline.json index 5e61b64be40..d3b49ef8929 100644 --- a/filebeat/module/redis/log/ingest/pipeline.json +++ b/filebeat/module/redis/log/ingest/pipeline.json @@ -15,13 +15,11 @@ } }, { "script": { - "lang": "painless", - "inline": "if (ctx.redis.log.level == '.') {\n ctx.redis.log.level = 'debug';\n } else if (ctx.redis.log.level == '-') {\n ctx.redis.log.level = 'verbose';\n } else if (ctx.redis.log.level == '*') {\n ctx.redis.log.level = 'notice';\n } else if (ctx.redis.log.level == '#') {\n ctx.redis.log.level = 'warning';\n }" + "id": "set-log-level" } }, { "script": { - "lang": "painless", - "inline": "if (ctx.redis.log.role == 'M') {\n ctx.redis.log.role = 'master';\n } else if (ctx.redis.log.role == 'S') {\n ctx.redis.log.role = 'slave';\n } else if (ctx.redis.log.role == 'C') {\n ctx.redis.log.role = 'child';\n } else if (ctx.redis.log.role == 'X') {\n ctx.redis.log.role = 'sentinel';\n }\n " + "id": "set-log-role" } }, { "remove": { diff --git a/filebeat/module/redis/log/ingest/script/set-log-level.painless b/filebeat/module/redis/log/ingest/script/set-log-level.painless new file mode 100644 index 00000000000..d99c3d41709 --- /dev/null +++ b/filebeat/module/redis/log/ingest/script/set-log-level.painless @@ -0,0 +1,9 @@ +if (ctx.redis.log.level == '.') { + ctx.redis.log.level = 'debug'; +} else if (ctx.redis.log.level == '-') { + ctx.redis.log.level = 'verbose'; +} else if (ctx.redis.log.level == '*') { + ctx.redis.log.level = 'notice'; +} else if (ctx.redis.log.level == '#') { + ctx.redis.log.level = 'warning'; +} diff --git a/filebeat/module/redis/log/ingest/script/set-log-role.painless b/filebeat/module/redis/log/ingest/script/set-log-role.painless new file mode 100644 index 00000000000..e0f40da2fb7 --- /dev/null +++ b/filebeat/module/redis/log/ingest/script/set-log-role.painless @@ -0,0 +1,9 @@ +if (ctx.redis.log.role == 'M') { + ctx.redis.log.role = 'master'; +} else if (ctx.redis.log.role == 'S') { + ctx.redis.log.role = 'slave'; +} else if (ctx.redis.log.role == 'C') { + ctx.redis.log.role = 'child'; +} else if (ctx.redis.log.role == 'X') { + ctx.redis.log.role = 'sentinel'; +} diff --git a/filebeat/module/redis/log/manifest.yml b/filebeat/module/redis/log/manifest.yml index 3c63a894c28..441b20674d0 100644 --- a/filebeat/module/redis/log/manifest.yml +++ b/filebeat/module/redis/log/manifest.yml @@ -12,3 +12,7 @@ var: ingest_pipeline: ingest/pipeline.json input: config/log.yml + +pipeline_script: +- "set-log-level.painless" +- "set-log-role.painless"