Skip to content

Commit

Permalink
Possibility to create elasticsearch mapping on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
ruflin committed Jan 27, 2016
1 parent 8b5ab89 commit 2f26ec1
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]
- Update builds to Golang version 1.5.3
- Add ability to override configuration settings using environment variables {issue}114[114]
- Libbeat now always exits through a single exit method for proper cleanup and control {pull}736[736]
- Possibility to create elasticsearch mapping on startup {pull}639[639]

*Packetbeat*
- Change the DNS library used throughout the dns package to github.com/miekg/dns. {pull}803[803]
Expand Down
17 changes: 17 additions & 0 deletions filebeat/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ filebeat:
# Event count spool threshold - forces network flush if exceeded
#spool_size: 2048

# Enable async publisher pipeline in filebeat (Experimental!)
#publish_async: false

# Defines how often the spooler is flushed. After idle_timeout the spooler is
# Flush even though spool_size is not reached.
#idle_timeout: 5s
Expand Down Expand Up @@ -196,6 +199,20 @@ output:
# [filebeat-]YYYY.MM.DD keys.
#index: "filebeat"

# A template is used to set the mapping in elasticsearch
# By default template loading is disabled an not template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
#template:

# Template name. By default the template name is the same as the filebeat
#name: "filebeat"

# Path to template file
#path: "filebeat.template.json"

# Overwrite existing template
#overwrite: false

# Optional HTTP Path
#path: "/elasticsearch"

Expand Down
14 changes: 14 additions & 0 deletions libbeat/etc/libbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ output:
# [beatname-]YYYY.MM.DD keys.
#index: "beatname"

# A template is used to set the mapping in elasticsearch
# By default template loading is disabled an not template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
#template:

# Template name. By default the template name is the same as the beatname
#name: "beatname"

# Path to template file
#path: "beatname.template.json"

# Overwrite existing template
#overwrite: false

# Optional HTTP Path
#path: "/elasticsearch"

Expand Down
34 changes: 33 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,38 @@ func (client *Client) PublishEvent(event common.MapStr) error {
return nil
}

// LoadTemplate loads a template into Elasticsearch overwriting the existing
// template if it exists. If you wish to not overwrite an existing template
// then use CheckTemplate prior to calling this method.
func (client *Client) LoadTemplate(templateName string, reader *bytes.Reader) error {

status, _, err := client.execRequest("PUT", client.URL+"/_template/"+templateName, reader)

if err != nil {
return fmt.Errorf("Template could not be loaded. Error: %s", err)
}
if status != 200 {
return fmt.Errorf("Template could not be loaded. Status: %v", status)
}

logp.Info("Elasticsearch template with name '%s' loaded", templateName)

return nil
}

// CheckTemplate checks if a given template already exist. It returns true if
// and only if Elasticsearch returns with HTTP status code 200.
func (client *Client) CheckTemplate(templateName string) bool {

status, _, _ := client.request("HEAD", "/_template/"+templateName, nil, nil)

if status != 200 {
return false
}

return true
}

func (conn *Connection) Connect(timeout time.Duration) error {
var err error
conn.connected, err = conn.Ping(timeout)
Expand Down Expand Up @@ -390,7 +422,7 @@ func (conn *Connection) request(
body interface{},
) (int, []byte, error) {
url := makeURL(conn.URL, path, params)
logp.Debug("elasticsearch", "%s %s %s", method, url, body)
logp.Debug("elasticsearch", "%s %s %v", method, url, body)

var obj []byte
if body != nil {
Expand Down
76 changes: 76 additions & 0 deletions libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"testing"
"time"

"bytes"
"github.com/stretchr/testify/assert"
"io/ioutil"
"path/filepath"
)

func TestClientConnect(t *testing.T) {
Expand All @@ -18,3 +21,76 @@ func TestClientConnect(t *testing.T) {
assert.Nil(t, err)
assert.True(t, client.IsConnected())
}

func TestCheckTemplate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test in short mode because it requires ES")
}

client := GetTestingElasticsearch()
err := client.Connect(5 * time.Second)
assert.Nil(t, err)

// Check for non existant template
assert.False(t, client.CheckTemplate("libbeat"))
}

func TestLoadTemplate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test in short mode because it requires ES")
}

// Load template
absPath, err := filepath.Abs("../../tests/files/")
assert.NotNil(t, absPath)
assert.Nil(t, err)

templatePath := absPath + "/template.json"
content, err := ioutil.ReadFile(templatePath)
reader := bytes.NewReader(content)
assert.Nil(t, err)

// Setup ES
client := GetTestingElasticsearch()
err = client.Connect(5 * time.Second)
assert.Nil(t, err)

templateName := "testbeat"

// Load template
err = client.LoadTemplate(templateName, reader)
assert.Nil(t, err)

// Make sure template was loaded
assert.True(t, client.CheckTemplate(templateName))

// Delete template again to clean up
client.request("DELETE", "/_template/"+templateName, nil, nil)

// Make sure it was removed
assert.False(t, client.CheckTemplate(templateName))

}

func TestLoadInvalidTemplate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test in short mode because it requires ES")
}

// Invalid Template
reader := bytes.NewReader([]byte("{json:invalid}"))

// Setup ES
client := GetTestingElasticsearch()
err := client.Connect(5 * time.Second)
assert.Nil(t, err)

templateName := "invalidtemplate"

// Try to load invalid template
err = client.LoadTemplate(templateName, reader)
assert.Error(t, err)

// Make sure template was not loaded
assert.False(t, client.CheckTemplate(templateName))
}
45 changes: 45 additions & 0 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"strings"
"time"

"bytes"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
"io/ioutil"
)

var debug = logp.MakeDebug("elasticsearch")
Expand Down Expand Up @@ -77,6 +79,7 @@ func (out *elasticsearchOutput) init(
}

clients, err := mode.MakeClients(config, makeClientFactory(tlsConfig, config))

if err != nil {
return err
}
Expand Down Expand Up @@ -117,6 +120,8 @@ func (out *elasticsearchOutput) init(
return err
}

loadTemplate(config.Template, clients)

if config.Save_topology {
err := out.EnableTTL()
if err != nil {
Expand Down Expand Up @@ -146,6 +151,46 @@ func (out *elasticsearchOutput) init(
return nil
}

// loadTemplate checks if the index mapping template should be loaded
// In case template loading is enabled, template is written to index
func loadTemplate(config outputs.Template, clients []mode.ProtocolClient) {
// Check if template should be loaded
// Not being able to load the template will output an error but will not stop execution
if config.Name != "" && len(clients) > 0 {

// Always takes the first client
esClient := clients[0].(*Client)

logp.Info("Loading template enabled. Trying to load template: %v", config.Path)

exists := esClient.CheckTemplate(config.Name)

// Check if template already exist or should be overwritten
if !exists || config.Overwrite {

if config.Overwrite {
logp.Info("Existing template will be overwritten, as overwrite is enabled.")
}

// Load template from file
content, err := ioutil.ReadFile(config.Path)
if err != nil {
logp.Err("Could not load template from file path: %s; Error: %s", config.Path, err)
} else {
reader := bytes.NewReader(content)
err = esClient.LoadTemplate(config.Name, reader)

if err != nil {
logp.Err("Could not load template: %v", err)
}
}
} else {
logp.Info("Template already exists and will not be overwritten.")
}

}
}

func makeClientFactory(
tls *tls.Config,
config outputs.MothershipConfig,
Expand Down
7 changes: 7 additions & 0 deletions libbeat/outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type MothershipConfig struct {
ProxyURL string `yaml:"proxy_url"`
Index string
Path string
Template Template
Db int
Db_topology int
Timeout int
Expand All @@ -34,6 +35,12 @@ type MothershipConfig struct {
CompressionLevel *int `yaml:"compression_level"`
}

type Template struct {
Name string
Path string
Overwrite bool
}

type Options struct {
Guaranteed bool
}
Expand Down
42 changes: 42 additions & 0 deletions libbeat/tests/files/template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"mappings": {
"_default_": {
"_all": {
"enabled": true,
"norms": {
"enabled": false
}
},
"dynamic_templates": [
{
"template1": {
"mapping": {
"doc_values": true,
"ignore_above": 1024,
"index": "not_analyzed",
"type": "{dynamic_type}"
},
"match": "*"
}
}
],
"properties": {
"@timestamp": {
"type": "date"
},
"message": {
"type": "string",
"index": "analyzed"
},
"offset": {
"type": "long",
"doc_values": "true"
}
}
}
},
"settings": {
"index.refresh_interval": "5s"
},
"template": "mockbeat-*"
}
1 change: 0 additions & 1 deletion libbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def test_base(self):
Basic test with exiting Mockbeat normally
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*"
)

exit_code = self.run_beat()
Expand Down
14 changes: 14 additions & 0 deletions packetbeat/packetbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,20 @@ output:
# [packetbeat-]YYYY.MM.DD keys.
#index: "packetbeat"

# A template is used to set the mapping in elasticsearch
# By default template loading is disabled an not template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
#template:

# Template name. By default the template name is the same as the packetbeat
#name: "packetbeat"

# Path to template file
#path: "packetbeat.template.json"

# Overwrite existing template
#overwrite: false

# Optional HTTP Path
#path: "/elasticsearch"

Expand Down
14 changes: 14 additions & 0 deletions topbeat/topbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ output:
# [topbeat-]YYYY.MM.DD keys.
#index: "topbeat"

# A template is used to set the mapping in elasticsearch
# By default template loading is disabled an not template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
#template:

# Template name. By default the template name is the same as the topbeat
#name: "topbeat"

# Path to template file
#path: "topbeat.template.json"

# Overwrite existing template
#overwrite: false

# Optional HTTP Path
#path: "/elasticsearch"

Expand Down
Loading

0 comments on commit 2f26ec1

Please sign in to comment.