Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support stored scripts #5339

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ https://github.com/elastic/beats/compare/v6.0.1...v6.1.0[View commits]
- Add experimental Docker autodiscover functionality. {pull}5245[5245]
- Add option to convert the timestamps to UTC in the system module. {pull}5647[5647]
- Add Logstash module support for main log and the slow log, support the plain text or structured JSON format {pull}5481[5481]
- Support stored scripts {pull}5339[5339]

*Metricbeat*

Expand Down
88 changes: 81 additions & 7 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import (

// Fileset struct is the representation of a fileset.
type Fileset struct {
name string
mcfg *ModuleConfig
fcfg *FilesetConfig
modulePath string
manifest *manifest
vars map[string]interface{}
pipelineID string
name string
mcfg *ModuleConfig
fcfg *FilesetConfig
modulePath string
manifest *manifest
vars map[string]interface{}
pipelineID string
scriptIDTemplate *template.Template
}

// New allocates a new Fileset object with the given configuration.
Expand Down Expand Up @@ -75,6 +76,11 @@ func (fs *Fileset) Read(beatVersion string) error {
return err
}

// if pipeline scripts are available generate a template for its ids
if len(fs.manifest.PipelineScripts) > 0 {
fs.scriptIDTemplate = fs.getScriptIDTemplate(beatVersion)
}

return nil
}

Expand All @@ -95,6 +101,7 @@ type manifest struct {
Requires struct {
Processors []ProcessorRequirement `config:"processors"`
} `config:"requires"`
PipelineScripts []string `config:"pipeline_script"`
}

func newManifest(cfg *common.Config) (*manifest, error) {
Expand Down Expand Up @@ -330,6 +337,37 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) {
return cfg, nil
}

// GetScriptsToStore returns a map of scripts under ingest/script folder of fileset
// Keys of the returned map are the filename and the values are the source code of the script
func (fs *Fileset) GetScriptsToStore() (map[string]string, error) {
// return none if no scripts are available for the fileset
if len(fs.manifest.PipelineScripts) == 0 {
return nil, nil
}

// read all scripts into a map
folder := filepath.Join(fs.modulePath, fs.name, "ingest", "script")
scripts := make(map[string]string)
for _, name := range fs.manifest.PipelineScripts {
scriptPath := filepath.Join(folder, name)

var source []byte
source, err := ioutil.ReadFile(scriptPath)
if err != nil {
return nil, fmt.Errorf("Error while reading script %s for fileset %s: %v", name, fs.name, err)
}
scripts[name] = string(source[:])
}

return scripts, nil
}

// getScriptIDTemplate returns the Ingest Node script ID template
func (fs *Fileset) getScriptIDTemplate(beatVersion string) *template.Template {
tStr := formatScriptIDTemplate(fs.mcfg.Module, fs.name, beatVersion)
return template.Must(template.New("script_id").Parse(tStr))
}

// getPipelineID returns the Ingest Node pipeline ID
func (fs *Fileset) getPipelineID(beatVersion string) (string, error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false)
Expand Down Expand Up @@ -362,13 +400,49 @@ func (fs *Fileset) GetPipeline(esVersion string) (pipelineID string, content map
return "", nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err)
}

for _, name := range fs.manifest.PipelineScripts {
jsonString, err = substituteScriptIDs(jsonString, name, fs.scriptIDTemplate)
if err != nil {
return "", nil, err
}

}

err = json.Unmarshal([]byte(jsonString), &content)
if err != nil {
return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
}

return fs.pipelineID, content, nil
}

func substituteScriptIDs(jsonString, name string, t *template.Template) (string, error) {
p := strings.Split(name, ".")
if len(p) != 2 {
return "", fmt.Errorf("Error substituting script ids: invalid script name.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error strings should not be capitalized or end with punctuation or a newline

}

scriptPipelinePattern := "\"script\": {\n \"id\": \"%s\""
scriptElem := fmt.Sprintf(scriptPipelinePattern, p[0])

scriptID := bytes.NewBufferString("")
err := t.Execute(scriptID, p[0])
if err != nil {
return "", err
}

scriptElemFull := fmt.Sprintf(scriptPipelinePattern, scriptID.String())
jsonString = strings.Replace(jsonString, scriptElem, scriptElemFull, -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using string replacement for this seems brittle. Alternatively we could go through the JSON keys and look for the script tags. I think we do something like that for the ML job ID replacement. This gets more complicated if we look into anything but the top level processors, but I think that's all we need for the moment? What do you think?


return jsonString, nil

}

// formatScriptIDTemplate generates the ID to be used for the pipeline script ID in Elasticsearch
func formatScriptIDTemplate(module, fileset, beatVersion string) string {
return fmt.Sprintf("filebeat-%s-%s-%s-{{.}}", beatVersion, module, fileset)
}

// formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch
func formatPipelineID(module, fileset, path, beatVersion string) string {
return fmt.Sprintf("filebeat-%s-%s-%s-%s", beatVersion, module, fileset, removeExt(filepath.Base(path)))
Expand Down
1 change: 1 addition & 0 deletions filebeat/fileset/fileset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestLoadManifestNginx(t *testing.T) {
assert.Equal(t, manifest.ModuleVersion, "1.0")
assert.Equal(t, manifest.IngestPipeline, "ingest/default.json")
assert.Equal(t, manifest.Input, "config/nginx-access.yml")
assert.Equal(t, manifest.PipelineScripts, []string{"is-private-ip.painless"})

vars := manifest.Vars
assert.Equal(t, "paths", vars[0]["name"])
Expand Down
100 changes: 100 additions & 0 deletions filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fileset

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -257,6 +258,105 @@ func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) {
return result, nil
}

// PipelineLoader factory builds and returns a PipelineLoader

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported type PipelineLoaderFactory should be of the form "PipelineLoaderFactory ..." (with optional leading article)

type PipelineLoaderFactory func() (PipelineLoader, error)

// PipelineLoader is a subset of the Elasticsearch client API capable of loading
// the pipelines.
type PipelineLoader interface {
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
GetVersion() string
}

// LoadPipelines loads the pipelines for each configured fileset.
func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader) error {
for module, filesets := range reg.registry {
for name, fileset := range filesets {
// check that all the required Ingest Node plugins are available
requiredProcessors := fileset.GetRequiredProcessors()
logp.Debug("modules", "Required processors: %s", requiredProcessors)
if len(requiredProcessors) > 0 {
err := checkAvailableProcessors(esClient, requiredProcessors)
if err != nil {
return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err)
}
}

scripts, err := fileset.GetScriptsToStore()
if err != nil {
return fmt.Errorf("Error getting scripts for fileset %s/%s: %v", module, name, err)
}
err = loadScripts(esClient, fileset, scripts)
if err != nil {
return fmt.Errorf("Error loading scripts for fileset %s/%s: %v", module, name, err)
}

pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion())
if err != nil {
return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err)
}
err = loadPipeline(esClient, pipelineID, content)
if err != nil {
return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err)
}
}
}
return nil
}

func scriptPayload(fs *Fileset, name, source string) (common.MapStr, string, error) {
parts := strings.Split(name, ".")
if len(parts) != 2 {
return nil, "", fmt.Errorf("Invalid number of filename parts: %d (instead of 2)", len(parts))
}

if parts[1] != "painless" {
return nil, "", fmt.Errorf("Only painless scripts can be stored for pipelines")
}

scriptID := bytes.NewBufferString("")
err := fs.scriptIDTemplate.Execute(scriptID, parts[0])
if err != nil {
return nil, "", fmt.Errorf("error while generating id")
}

url := strings.Join([]string{"/_scripts/", scriptID.String()}, "")
return common.MapStr{
"script": common.MapStr{
"lang": "painless",
"source": source,
},
}, url, nil
}

func loadScript(esClient PipelineLoader, fs *Fileset, name, source string) error {
p, url, err := scriptPayload(fs, name, source)
if err != nil {
return fmt.Errorf("Error adding script: %v", err)
}

status, body, err := esClient.Request("POST", url, "", nil, p)
if err != nil {
return fmt.Errorf("Error adding script: %v", err)
}
if status > 299 {
return fmt.Errorf("Error adding script. Status: %d. Response body: %s", status, body)
}
logp.Info("Loaded script: %v %v", name, url)
return nil
}

func loadScripts(esClient PipelineLoader, fs *Fileset, scripts map[string]string) error {
for name, source := range scripts {
err := loadScript(esClient, fs, name, source)
if err != nil {
return err
}
}
return nil
}

// InfoString returns the enabled modules and filesets in a single string, ready to
// be shown to the user
func (reg *ModuleRegistry) InfoString() string {
Expand Down
3 changes: 1 addition & 2 deletions filebeat/module/auditd/log/ingest/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@
},
{
"script": {
"lang": "painless",
"inline": " String trimQuotes(def v) {\n if (v.startsWith(\"'\") || v.startsWith('\"')) {\n v = v.substring(1, v.length());\n }\n if (v.endsWith(\"'\") || v.endsWith('\"')) {\n v = v.substring(0, v.length()-1);\n } \n return v;\n }\n \n boolean isHexAscii(String v) {\n def len = v.length();\n if (len == 0 || len % 2 != 0) {\n return false; \n }\n \n for (int i = 0 ; i < len ; i++) {\n if (Character.digit(v.charAt(i), 16) == -1) {\n return false;\n }\n }\n\n return true;\n }\n \n String convertHexToString(String hex) {\n\t StringBuilder sb = new StringBuilder();\n\n for (int i=0; i < hex.length() - 1; i+=2) {\n String output = hex.substring(i, (i + 2));\n int decimal = Integer.parseInt(output, 16);\n sb.append((char)decimal);\n }\n\n return sb.toString();\n }\n \n def possibleHexKeys = ['exe', 'cmd'];\n \n def audit = ctx.auditd.get(\"log\");\n Iterator entries = audit.entrySet().iterator();\n while (entries.hasNext()) {\n def e = entries.next();\n def k = e.getKey();\n def v = e.getValue(); \n\n // Remove entries whose value is ?\n if (v == \"?\" || v == \"(null)\" || v == \"\") {\n entries.remove();\n continue;\n }\n \n // Convert hex values to ASCII.\n if (possibleHexKeys.contains(k) && isHexAscii(v)) {\n v = convertHexToString(v);\n audit.put(k, v);\n }\n \n // Trim quotes.\n if (v instanceof String) {\n v = trimQuotes(v);\n audit.put(k, v);\n }\n \n // Convert arch.\n if (k == \"arch\" && v == \"c000003e\") {\n audit.put(k, \"x86_64\");\n }\n }"
"id": "trim-quotes"
}
},
{
Expand Down
69 changes: 69 additions & 0 deletions filebeat/module/auditd/log/ingest/script/trim-quotes.painless
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
String trimQuotes(def v) {
if (v.startsWith(\"'\") || v.startsWith('\"')) {
v = v.substring(1, v.length());
}
if (v.endsWith(\"'\") || v.endsWith('\"')) {
v = v.substring(0, v.length()-1);
}
return v;
}

boolean isHexAscii(String v) {
def len = v.length();
if (len == 0 || len % 2 != 0) {
return false;
}

for (int i = 0 ; i < len ; i++) {
if (Character.digit(v.charAt(i), 16) == -1) {
return false;
}
}

return true;
}

String convertHexToString(String hex) {
StringBuilder sb = new StringBuilder();

for (int i=0; i < hex.length() - 1; i+=2) {
String output = hex.substring(i, (i + 2));
int decimal = Integer.parseInt(output, 16);
sb.append((char)decimal);
}

return sb.toString();
}

def possibleHexKeys = ['exe', 'cmd'];

def audit = ctx.auditd.get(\"log\");
Iterator entries = audit.entrySet().iterator();
while (entries.hasNext()) {
def e = entries.next();
def k = e.getKey();
def v = e.getValue();

// Remove entries whose value is ?
if (v == \"?\" || v == \"(null)\" || v == \"\") {
entries.remove();
continue;
}

// Convert hex values to ASCII.
if (possibleHexKeys.contains(k) && isHexAscii(v)) {
v = convertHexToString(v);
audit.put(k, v);
}

// Trim quotes.
if (v instanceof String) {
v = trimQuotes(v);
audit.put(k, v);
}

// Convert arch.
if (k == \"arch\" && v == \"c000003e\") {
audit.put(k, \"x86_64\");
}
}
3 changes: 3 additions & 0 deletions filebeat/module/auditd/log/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ var:
ingest_pipeline: ingest/pipeline.json
input: config/log.yml

pipeline_script:
- "trim-quotes.painless"

requires.processors:
- name: geoip
plugin: ingest-geoip
3 changes: 1 addition & 2 deletions filebeat/module/nginx/access/ingest/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
}
}, {
"script": {
"lang": "painless",
"inline": "boolean isPrivate(def ip) { try { StringTokenizer tok = new StringTokenizer(ip, '.'); int firstByte = Integer.parseInt(tok.nextToken()); int secondByte = Integer.parseInt(tok.nextToken()); if (firstByte == 10) { return true; } if (firstByte == 192 && secondByte == 168) { return true; } if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) { return true; } if (firstByte == 127) { return true; } return false; } catch (Exception e) { return false; } } def found = false; for (def item : ctx.nginx.access.remote_ip_list) { if (!isPrivate(item)) { ctx.nginx.access.remote_ip = item; found = true; break; } } if (!found) { ctx.nginx.access.remote_ip = ctx.nginx.access.remote_ip_list[0]; }"
"id": "is-private-ip"
}
}, {
"remove":{
Expand Down
34 changes: 34 additions & 0 deletions filebeat/module/nginx/access/ingest/script/is-private-ip.painless
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
boolean isPrivate(def ip) {
try {
StringTokenizer tok = new StringTokenizer(ip, '.');
int firstByte = Integer.parseInt(tok.nextToken());
int secondByte = Integer.parseInt(tok.nextToken());
if (firstByte == 10) {
return true;
}
if (firstByte == 192 && secondByte == 168) {
return true;
}
if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) {
return true;
}
if (firstByte == 127) {
return true;
}
return false;
} catch (Exception e) {
return false;
}
}

def found = false;
for (def item : ctx.nginx.access.remote_ip_list) {
if (!isPrivate(item)) {
ctx.nginx.access.remote_ip = item;
found = true;
break;
}
}
if (!found) {
ctx.nginx.access.remote_ip = ctx.nginx.access.remote_ip_list[0];
}
Loading