Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use local timezone for TZ conversion in the FB system module #5647

Merged
merged 6 commits into from
Nov 21, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 70 additions & 13 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"text/template"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
mlimporter "github.com/elastic/beats/libbeat/ml-importer"
)

Expand Down Expand Up @@ -155,18 +156,58 @@ func (fs *Fileset) evaluateVars() (map[string]interface{}, error) {
return vars, nil
}

// turnOffElasticsearchVars re-evaluates the variables that have `min_elasticsearch_version`
// set.
func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion string) (map[string]interface{}, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice feature.


retVars := map[string]interface{}{}
for key, val := range vars {
retVars[key] = val
}

haveVersion, err := common.NewVersion(esVersion)
if err != nil {
return vars, fmt.Errorf("Error parsing version %s: %v", esVersion, err)
}

for _, vals := range fs.manifest.Vars {
var exists bool
name, exists := vals["name"].(string)
if !exists {
return nil, fmt.Errorf("Variable doesn't have a string 'name' key")
Copy link
Member

@andrewkroh andrewkroh Nov 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it matters, but !exists in this case is telling you that either "name" does not exist or name is not a string. And so maybe calling the variable ok would be better since it allows more ambiguity.

Same comment for the other locations where the type assertion and map lookup signals are coalesced.

}

minESVersion, exists := vals["min_elasticsearch_version"].(map[string]interface{})
if exists {
minVersion, err := common.NewVersion(minESVersion["version"].(string))
if err != nil {
return vars, fmt.Errorf("Error parsing version %s: %v", minESVersion["version"].(string), err)
}

logp.Debug("fileset", "Comparing ES version %s with %s", haveVersion, minVersion)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest "Comparing ES version %s with requirement of %s" to make the meaning of the two versions more clear.


if haveVersion.LessThan(minVersion) {
retVars[name] = minESVersion["value"]
logp.Info("Setting var %s to %v because Elasticsearch version is %s", name, minESVersion["value"], haveVersion)
}
}
}

return retVars, nil
}

// resolveVariable considers the value as a template so it can refer to built-in variables
// as well as other variables defined before them.
func resolveVariable(vars map[string]interface{}, value interface{}) (interface{}, error) {
switch v := value.(type) {
case string:
return applyTemplate(vars, v)
return applyTemplate(vars, v, false)
case []interface{}:
transformed := []interface{}{}
for _, val := range v {
s, ok := val.(string)
if ok {
transf, err := applyTemplate(vars, s)
transf, err := applyTemplate(vars, s, false)
if err != nil {
return nil, fmt.Errorf("array: %v", err)
}
Expand All @@ -180,9 +221,15 @@ func resolveVariable(vars map[string]interface{}, value interface{}) (interface{
return value, nil
}

// applyTemplate applies a Golang text/template
func applyTemplate(vars map[string]interface{}, templateString string) (string, error) {
tpl, err := template.New("text").Parse(templateString)
// applyTemplate applies a Golang text/template. If specialDelims is set to true,
// the delimiters are set to `{%` and `%}` instead of `{{` and `}}`. These are easier to use
// in pipeline definitions.
func applyTemplate(vars map[string]interface{}, templateString string, specialDelims bool) (string, error) {
tpl := template.New("text")
if specialDelims {
tpl = tpl.Delims("{%", "%}")
}
tpl, err := tpl.Parse(templateString)
if err != nil {
return "", fmt.Errorf("Error parsing template %s: %v", templateString, err)
}
Expand Down Expand Up @@ -215,7 +262,7 @@ func (fs *Fileset) getBuiltinVars() (map[string]interface{}, error) {
}

func (fs *Fileset) getProspectorConfig() (*common.Config, error) {
path, err := applyTemplate(fs.vars, fs.manifest.Prospector)
path, err := applyTemplate(fs.vars, fs.manifest.Prospector, false)
if err != nil {
return nil, fmt.Errorf("Error expanding vars on the prospector path: %v", err)
}
Expand All @@ -224,7 +271,7 @@ func (fs *Fileset) getProspectorConfig() (*common.Config, error) {
return nil, fmt.Errorf("Error reading prospector file %s: %v", path, err)
}

yaml, err := applyTemplate(fs.vars, string(contents))
yaml, err := applyTemplate(fs.vars, string(contents), false)
if err != nil {
return nil, fmt.Errorf("Error interpreting the template of the prospector: %v", err)
}
Expand Down Expand Up @@ -269,27 +316,37 @@ func (fs *Fileset) getProspectorConfig() (*common.Config, error) {

// getPipelineID returns the Ingest Node pipeline ID
func (fs *Fileset) getPipelineID(beatVersion string) (string, error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline)
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false)
if err != nil {
return "", fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

return formatPipelineID(fs.mcfg.Module, fs.name, path, beatVersion), nil
}

func (fs *Fileset) GetPipeline() (pipelineID string, content map[string]interface{}, err error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline)
func (fs *Fileset) GetPipeline(esVersion string) (pipelineID string, content map[string]interface{}, err error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Fileset.GetPipeline should have comment or be unexported


path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false)
if err != nil {
return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

f, err := os.Open(filepath.Join(fs.modulePath, fs.name, path))
strContents, err := ioutil.ReadFile(filepath.Join(fs.modulePath, fs.name, path))
if err != nil {
return "", nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err)
}

dec := json.NewDecoder(f)
err = dec.Decode(&content)
vars, err := fs.turnOffElasticsearchVars(fs.vars, esVersion)
if err != nil {
return "", nil, err
}

jsonString, err := applyTemplate(vars, string(strContents), true)
if err != nil {
return "", nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err)
}

err = json.Unmarshal([]byte(jsonString), &content)
if err != nil {
return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader) error {
}
}

pipelineID, content, err := fileset.GetPipeline()
pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion())
if err != nil {
return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err)
}
Expand Down
4 changes: 4 additions & 0 deletions filebeat/module/system/syslog/config/syslog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ exclude_files: [".gz$"]
multiline:
pattern: "^\\s"
match: after
{{ if .convert_timezone }}
processors:
- add_locale: ~
{{ end }}
5 changes: 3 additions & 2 deletions filebeat/module/system/syslog/ingest/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
"field": "message"
}
},
{
{
"date": {
"field": "system.syslog.timestamp",
"target_field": "@timestamp",
"formats": [
"MMM d HH:mm:ss",
"MMM dd HH:mm:ss"
],
{% if .convert_timezone %}"timezone": "{{ beat.timezone }}",{% end %}
"ignore_failure": true
}
}
}
],
"on_failure" : [{
"set" : {
Expand Down
7 changes: 7 additions & 0 deletions filebeat/module/system/syslog/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ var:
os.darwin:
- /var/log/system.log*
os.windows: []
- name: convert_timezone
default: false
# if ES < 6.1.0, this flag switches to false automatically when evaluating the
# pipeline
min_elasticsearch_version:
version: 6.1.0
value: false

ingest_pipeline: ingest/pipeline.json
prospector: config/syslog.yml
6 changes: 3 additions & 3 deletions testing/environments/latest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
version: '2.1'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:6.0.0-rc2
image: docker.elastic.co/elasticsearch/elasticsearch:6.0.0
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9200"]
environment:
Expand All @@ -18,13 +18,13 @@ services:
context: docker/logstash
dockerfile: Dockerfile
args:
ELASTIC_VERSION: 6.0.0-rc2
ELASTIC_VERSION: 6.0.0
DOWNLOAD_URL: https://artifacts.elastic.co/downloads
environment:
- ES_HOST=elasticsearch

kibana:
image: docker.elastic.co/kibana/kibana:6.0.0-rc2
image: docker.elastic.co/kibana/kibana:6.0.0
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5601"]
retries: 6