Skip to content

Commit

Permalink
Use local timezone for TZ conversion in the FB system module
Browse files Browse the repository at this point in the history
This adds a `convert_timezone` fileset parameter that, when enabled,
does two things:

* Uses the `add_locale` processor in the FB proespector config
* Uses `{{ beat.timezone }}` as the `timezone` parameter for the
  date processor in the Ingest Node pipeline. This parameter accepts
  templates starting with ES 6.1.

For the moment the `convert_timezone` flag is off by default, to keep
backwards compatibility and because it results in an error when used
with ES < 6.1.

Closes elastic#3898.

For now this is only applied to the system module, but likely more
modules would benefit from this feature.
  • Loading branch information
tsg committed Nov 20, 2017
1 parent 68b4f6a commit 9f473a9
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 17 deletions.
34 changes: 22 additions & 12 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ func (fs *Fileset) evaluateVars() (map[string]interface{}, error) {
func resolveVariable(vars map[string]interface{}, value interface{}) (interface{}, error) {
switch v := value.(type) {
case string:
return applyTemplate(vars, v)
return applyTemplate(vars, v, false)
case []interface{}:
transformed := []interface{}{}
for _, val := range v {
s, ok := val.(string)
if ok {
transf, err := applyTemplate(vars, s)
transf, err := applyTemplate(vars, s, false)
if err != nil {
return nil, fmt.Errorf("array: %v", err)
}
Expand All @@ -180,9 +180,15 @@ func resolveVariable(vars map[string]interface{}, value interface{}) (interface{
return value, nil
}

// applyTemplate applies a Golang text/template
func applyTemplate(vars map[string]interface{}, templateString string) (string, error) {
tpl, err := template.New("text").Parse(templateString)
// applyTemplate applies a Golang text/template. If specialDelims is set to true,
// the delimiters are set to `{%` and `%}` instead of `{{` and `}}`. These are easier to use
// in pipeline definitions.
func applyTemplate(vars map[string]interface{}, templateString string, specialDelims bool) (string, error) {
tpl := template.New("text")
if specialDelims {
tpl = tpl.Delims("{%", "%}")
}
tpl, err := tpl.Parse(templateString)
if err != nil {
return "", fmt.Errorf("Error parsing template %s: %v", templateString, err)
}
Expand Down Expand Up @@ -215,7 +221,7 @@ func (fs *Fileset) getBuiltinVars() (map[string]interface{}, error) {
}

func (fs *Fileset) getProspectorConfig() (*common.Config, error) {
path, err := applyTemplate(fs.vars, fs.manifest.Prospector)
path, err := applyTemplate(fs.vars, fs.manifest.Prospector, false)
if err != nil {
return nil, fmt.Errorf("Error expanding vars on the prospector path: %v", err)
}
Expand All @@ -224,7 +230,7 @@ func (fs *Fileset) getProspectorConfig() (*common.Config, error) {
return nil, fmt.Errorf("Error reading prospector file %s: %v", path, err)
}

yaml, err := applyTemplate(fs.vars, string(contents))
yaml, err := applyTemplate(fs.vars, string(contents), false)
if err != nil {
return nil, fmt.Errorf("Error interpreting the template of the prospector: %v", err)
}
Expand Down Expand Up @@ -269,7 +275,7 @@ func (fs *Fileset) getProspectorConfig() (*common.Config, error) {

// getPipelineID returns the Ingest Node pipeline ID
func (fs *Fileset) getPipelineID(beatVersion string) (string, error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline)
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false)
if err != nil {
return "", fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}
Expand All @@ -278,18 +284,22 @@ func (fs *Fileset) getPipelineID(beatVersion string) (string, error) {
}

func (fs *Fileset) GetPipeline() (pipelineID string, content map[string]interface{}, err error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline)
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false)
if err != nil {
return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

f, err := os.Open(filepath.Join(fs.modulePath, fs.name, path))
strContents, err := ioutil.ReadFile(filepath.Join(fs.modulePath, fs.name, path))
if err != nil {
return "", nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err)
}

dec := json.NewDecoder(f)
err = dec.Decode(&content)
jsonString, err := applyTemplate(fs.vars, string(strContents), true)
if err != nil {
return "", nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err)
}

err = json.Unmarshal([]byte(jsonString), &content)
if err != nil {
return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
}
Expand Down
4 changes: 4 additions & 0 deletions filebeat/module/system/syslog/config/syslog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ exclude_files: [".gz$"]
multiline:
pattern: "^\\s"
match: after
{{ if .convert_timezone }}
processors:
- add_locale: ~
{{ end }}
5 changes: 3 additions & 2 deletions filebeat/module/system/syslog/ingest/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
"field": "message"
}
},
{
{
"date": {
"field": "system.syslog.timestamp",
"target_field": "@timestamp",
"formats": [
"MMM d HH:mm:ss",
"MMM dd HH:mm:ss"
],
{% if .convert_timezone %}"timezone": "{{ beat.timezone }}",{% end %}
"ignore_failure": true
}
}
}
],
"on_failure" : [{
"set" : {
Expand Down
2 changes: 2 additions & 0 deletions filebeat/module/system/syslog/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ var:
os.darwin:
- /var/log/system.log*
os.windows: []
- name: convert_timezone
default: false

ingest_pipeline: ingest/pipeline.json
prospector: config/syslog.yml
6 changes: 3 additions & 3 deletions testing/environments/latest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
version: '2.1'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:6.0.0-rc2
image: docker.elastic.co/elasticsearch/elasticsearch:6.0.0
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9200"]
environment:
Expand All @@ -18,13 +18,13 @@ services:
context: docker/logstash
dockerfile: Dockerfile
args:
ELASTIC_VERSION: 6.0.0-rc2
ELASTIC_VERSION: 6.0.0
DOWNLOAD_URL: https://artifacts.elastic.co/downloads
environment:
- ES_HOST=elasticsearch

kibana:
image: docker.elastic.co/kibana/kibana:6.0.0-rc2
image: docker.elastic.co/kibana/kibana:6.0.0
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5601"]
retries: 6

0 comments on commit 9f473a9

Please sign in to comment.