diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 9b092f92c2ff..309ea89b05e0 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha1...master[Check the HEAD d
*Affecting all Beats*
- Unify dashboard exporter tools. {pull}9097[9097]
+- Use _doc as document type of the Elasticsearch major version is 7. {pull}9056[9056]
*Auditbeat*
diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go
index 85ce0b47e3b2..9cb36dbddfd9 100644
--- a/filebeat/beater/filebeat.go
+++ b/filebeat/beater/filebeat.go
@@ -221,12 +221,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
return errors.Errorf("Error creating Kibana client: %v", err)
}
- kibanaVersion, err := common.NewVersion(kibanaClient.GetVersion())
- if err != nil {
- return errors.Errorf("Error checking Kibana version: %v", err)
- }
-
- if err := setupMLBasedOnVersion(fb.moduleRegistry, esClient, kibanaClient, kibanaVersion); err != nil {
+ if err := setupMLBasedOnVersion(fb.moduleRegistry, esClient, kibanaClient); err != nil {
errs = append(errs, err)
}
@@ -252,7 +247,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
continue
}
- if err := setupMLBasedOnVersion(set, esClient, kibanaClient, kibanaVersion); err != nil {
+ if err := setupMLBasedOnVersion(set, esClient, kibanaClient); err != nil {
errs = append(errs, err)
}
@@ -262,18 +257,16 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
return errs.Err()
}
-func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client, kibanaVersion *common.Version) error {
- if isElasticsearchLoads(kibanaVersion) {
+func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client) error {
+ if isElasticsearchLoads(kibanaClient.GetVersion()) {
return reg.LoadML(esClient)
}
return reg.SetupML(esClient, kibanaClient)
}
-func isElasticsearchLoads(kibanaVersion *common.Version) bool {
- if kibanaVersion.Major < 6 || kibanaVersion.Major == 6 && kibanaVersion.Minor < 1 {
- return true
- }
- return false
+func isElasticsearchLoads(kibanaVersion common.Version) bool {
+ return kibanaVersion.Major < 6 ||
+ (kibanaVersion.Major == 6 && kibanaVersion.Minor < 1)
}
// Run allows the beater to be run as a beat.
diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go
index 27646ca34b20..59a30b7e17c7 100644
--- a/filebeat/fileset/fileset.go
+++ b/filebeat/fileset/fileset.go
@@ -25,6 +25,7 @@ package fileset
import (
"bytes"
"encoding/json"
+ "errors"
"fmt"
"io/ioutil"
"os"
@@ -194,15 +195,14 @@ func (fs *Fileset) evaluateVars() (map[string]interface{}, error) {
// turnOffElasticsearchVars re-evaluates the variables that have `min_elasticsearch_version`
// set.
-func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion string) (map[string]interface{}, error) {
+func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion common.Version) (map[string]interface{}, error) {
retVars := map[string]interface{}{}
for key, val := range vars {
retVars[key] = val
}
- haveVersion, err := common.NewVersion(esVersion)
- if err != nil {
- return vars, fmt.Errorf("Error parsing version %s: %v", esVersion, err)
+ if !esVersion.IsValid() {
+ return vars, errors.New("Unknown Elasticsearch version")
}
for _, vals := range fs.manifest.Vars {
@@ -219,11 +219,11 @@ func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersi
return vars, fmt.Errorf("Error parsing version %s: %v", minESVersion["version"].(string), err)
}
- logp.Debug("fileset", "Comparing ES version %s with requirement of %s", haveVersion, minVersion)
+ logp.Debug("fileset", "Comparing ES version %s with requirement of %s", esVersion.String(), minVersion)
- if haveVersion.LessThan(minVersion) {
+ if esVersion.LessThan(minVersion) {
retVars[name] = minESVersion["value"]
- logp.Info("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], haveVersion)
+ logp.Info("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], esVersion.String())
}
}
}
@@ -360,7 +360,7 @@ func (fs *Fileset) getPipelineID(beatVersion string) (string, error) {
}
// GetPipeline returns the JSON content of the Ingest Node pipeline that parses the logs.
-func (fs *Fileset) GetPipeline(esVersion string) (pipelineID string, content map[string]interface{}, err error) {
+func (fs *Fileset) GetPipeline(esVersion common.Version) (pipelineID string, content map[string]interface{}, err error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false)
if err != nil {
return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go
index 93456b2ee3b7..8435532efa7a 100644
--- a/filebeat/fileset/fileset_test.go
+++ b/filebeat/fileset/fileset_test.go
@@ -27,7 +27,9 @@ import (
"testing"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
@@ -213,7 +215,8 @@ func TestGetPipelineNginx(t *testing.T) {
fs := getModuleForTesting(t, "nginx", "access")
assert.NoError(t, fs.Read("5.2.0"))
- pipelineID, content, err := fs.GetPipeline("5.2.0")
+ version := common.MustNewVersion("5.2.0")
+ pipelineID, content, err := fs.GetPipeline(*version)
assert.NoError(t, err)
assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipelineID)
assert.Contains(t, content, "description")
@@ -234,27 +237,31 @@ func TestGetPipelineConvertTS(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, fs.Read("6.1.0"))
- // ES 6.0.0 should not have beat.timezone referenced
- pipelineID, content, err := fs.GetPipeline("6.0.0")
- assert.NoError(t, err)
- assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
- marshaled, err := json.Marshal(content)
- assert.NoError(t, err)
- assert.NotContains(t, string(marshaled), "beat.timezone")
-
- // ES 6.1.0 should have beat.timezone referenced
- pipelineID, content, err = fs.GetPipeline("6.1.0")
- assert.NoError(t, err)
- assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
- marshaled, err = json.Marshal(content)
- assert.NoError(t, err)
- assert.Contains(t, string(marshaled), "beat.timezone")
+ cases := map[string]struct {
+ Beat string
+ Timezone bool
+ }{
+ "6.0.0": {Timezone: false},
+ "6.1.0": {Timezone: true},
+ "6.2.0": {Timezone: true},
+ }
- // ES 6.2.0 should have beat.timezone referenced
- pipelineID, content, err = fs.GetPipeline("6.2.0")
- assert.NoError(t, err)
- assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
- marshaled, err = json.Marshal(content)
- assert.NoError(t, err)
- assert.Contains(t, string(marshaled), "beat.timezone")
+ for esVersion, cfg := range cases {
+ pipelineName := "filebeat-6.1.0-system-syslog-pipeline"
+
+ t.Run(fmt.Sprintf("es=%v", esVersion), func(t *testing.T) {
+ ver := common.MustNewVersion(esVersion)
+ pipelineID, content, err := fs.GetPipeline(*ver)
+ require.NoError(t, err)
+ assert.Equal(t, pipelineName, pipelineID)
+
+ marshaled, err := json.Marshal(content)
+ require.NoError(t, err)
+ if cfg.Timezone {
+ assert.Contains(t, string(marshaled), "beat.timezone")
+ } else {
+ assert.NotContains(t, string(marshaled), "beat.timezone")
+ }
+ })
+ }
}
diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go
index dbf786053a85..45baabdc354d 100644
--- a/filebeat/fileset/modules_integration_test.go
+++ b/filebeat/fileset/modules_integration_test.go
@@ -22,7 +22,6 @@ package fileset
import (
"encoding/json"
"path/filepath"
- "strconv"
"testing"
"github.com/stretchr/testify/assert"
@@ -142,11 +141,5 @@ func TestAvailableProcessors(t *testing.T) {
func hasIngest(client *elasticsearch.Client) bool {
v := client.GetVersion()
- majorVersion := string(v[0])
- version, err := strconv.Atoi(majorVersion)
- if err != nil {
- return true
- }
-
- return version >= 5
+ return v.Major >= 5
}
diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go
index b67e9fce4874..d7f63bdabadd 100644
--- a/filebeat/fileset/pipelines.go
+++ b/filebeat/fileset/pipelines.go
@@ -22,6 +22,7 @@ import (
"fmt"
"strings"
+ "github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
@@ -33,7 +34,7 @@ type PipelineLoaderFactory func() (PipelineLoader, error)
type PipelineLoader interface {
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
- GetVersion() string
+ GetVersion() common.Version
}
// LoadPipelines loads the pipelines for each configured fileset.
diff --git a/filebeat/scripts/tester/main.go b/filebeat/scripts/tester/main.go
index 5a3d277b6e86..ebac853d9c07 100644
--- a/filebeat/scripts/tester/main.go
+++ b/filebeat/scripts/tester/main.go
@@ -264,7 +264,7 @@ func runSimulate(url string, pipeline map[string]interface{}, logs []string, ver
for _, s := range sources {
d := common.MapStr{
"_index": "index",
- "_type": "doc",
+ "_type": "_doc",
"_id": "id",
"_source": s,
}
diff --git a/libbeat/cmd/export/dashboard.go b/libbeat/cmd/export/dashboard.go
index 697da10061e0..4cf0c7db30e0 100644
--- a/libbeat/cmd/export/dashboard.go
+++ b/libbeat/cmd/export/dashboard.go
@@ -73,6 +73,7 @@ func GenDashboardCmd(name, idxPrefix, beatVersion string) *cobra.Command {
if decode {
r = dashboards.DecodeExported(r)
}
+
err = dashboards.SaveToFile(r, info.Dashboards[i].File, filepath.Dir(yml), client.GetVersion())
if err != nil {
fmt.Fprintf(os.Stderr, "Error saving dashboard '%s' to file '%s' : %+v\n",
diff --git a/libbeat/cmd/export/template.go b/libbeat/cmd/export/template.go
index bf3cbbb9b274..d56330a62ff5 100644
--- a/libbeat/cmd/export/template.go
+++ b/libbeat/cmd/export/template.go
@@ -57,7 +57,16 @@ func GenTemplateConfigCmd(settings instance.Settings, name, idxPrefix, beatVersi
}
}
- tmpl, err := template.New(b.Info.Version, index, version, cfg)
+ if version == "" {
+ version = b.Info.Version
+ }
+
+ esVersion, err := common.NewVersion(version)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Invalid Elasticsearch version: %s\n", err)
+ }
+
+ tmpl, err := template.New(b.Info.Version, index, *esVersion, cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "Error generating template: %+v", err)
os.Exit(1)
diff --git a/libbeat/common/version.go b/libbeat/common/version.go
index 795eb2b8ddb2..54293e9ff792 100644
--- a/libbeat/common/version.go
+++ b/libbeat/common/version.go
@@ -31,6 +31,16 @@ type Version struct {
Meta string
}
+// MustNewVersion creates a version from the given version string.
+// If the version string is invalid, MustNewVersion panics.
+func MustNewVersion(version string) *Version {
+ v, err := NewVersion(version)
+ if err != nil {
+ panic(err)
+ }
+ return v
+}
+
// NewVersion expects a string in the format:
// major.minor.bugfix(-meta)
func NewVersion(version string) (*Version, error) {
@@ -69,6 +79,11 @@ func NewVersion(version string) (*Version, error) {
return &v, nil
}
+// IsValid returns true if the version object stores a successfully parsed version number.
+func (v *Version) IsValid() bool {
+ return v.version != ""
+}
+
func (v *Version) IsMajor(major int) bool {
return major == v.Major
}
diff --git a/libbeat/dashboards/dashboards.go b/libbeat/dashboards/dashboards.go
index 81106f6064f7..833c7a1ebe89 100644
--- a/libbeat/dashboards/dashboards.go
+++ b/libbeat/dashboards/dashboards.go
@@ -22,8 +22,6 @@ import (
"errors"
"fmt"
"path/filepath"
- "strconv"
- "strings"
errw "github.com/pkg/errors"
@@ -106,12 +104,7 @@ func ImportDashboards(
esLoader.statusMsg("Elasticsearch URL %v", esLoader.client.Connection.URL)
- majorVersion, _, err := getMajorAndMinorVersion(esLoader.version)
- if err != nil {
- return fmt.Errorf("wrong Elasticsearch version: %v", err)
- }
-
- if majorVersion < 6 {
+ if esLoader.version.Major < 6 {
importVia = importViaES
} else {
importVia = useKibana
@@ -145,17 +138,16 @@ func setupAndImportDashboardsViaKibana(ctx context.Context, hostname string, kib
}
func ImportDashboardsViaKibana(kibanaLoader *KibanaLoader) error {
-
- if !isKibanaAPIavailable(kibanaLoader.version) {
- return fmt.Errorf("Kibana API is not available in Kibana version %s", kibanaLoader.version)
+ version := kibanaLoader.version
+ if !version.IsValid() {
+ return errors.New("No valid kibana version available")
}
- version, err := common.NewVersion(kibanaLoader.version)
- if err != nil {
- return fmt.Errorf("Invalid Kibana version: %s", kibanaLoader.version)
+ if !isKibanaAPIavailable(kibanaLoader.version) {
+ return fmt.Errorf("Kibana API is not available in Kibana version %s", kibanaLoader.version.String())
}
- importer, err := NewImporter(*version, kibanaLoader.config, kibanaLoader)
+ importer, err := NewImporter(version, kibanaLoader.config, kibanaLoader)
if err != nil {
return fmt.Errorf("fail to create a Kibana importer for loading the dashboards: %v", err)
}
@@ -187,40 +179,6 @@ func ImportDashboardsViaElasticsearch(esLoader *ElasticsearchLoader) error {
return nil
}
-func getMajorAndMinorVersion(version string) (int, int, error) {
- fields := strings.Split(version, ".")
- if len(fields) != 3 {
- return 0, 0, fmt.Errorf("wrong version %s", version)
- }
- majorVersion := fields[0]
- minorVersion := fields[1]
-
- majorVersionInt, err := strconv.Atoi(majorVersion)
- if err != nil {
- return 0, 0, err
- }
-
- minorVersionInt, err := strconv.Atoi(minorVersion)
- if err != nil {
- return 0, 0, err
- }
-
- return majorVersionInt, minorVersionInt, nil
-}
-
-func isKibanaAPIavailable(version string) bool {
- majorVersion, minorVersion, err := getMajorAndMinorVersion(version)
- if err != nil {
- return false
- }
-
- if majorVersion == 5 && minorVersion >= 6 {
- return true
- }
-
- if majorVersion >= 6 {
- return true
- }
-
- return false
+func isKibanaAPIavailable(version common.Version) bool {
+ return (version.Major == 5 && version.Minor >= 6) || version.Major >= 6
}
diff --git a/libbeat/dashboards/es_loader.go b/libbeat/dashboards/es_loader.go
index 270e2f26dd0a..4787bb3faf59 100644
--- a/libbeat/dashboards/es_loader.go
+++ b/libbeat/dashboards/es_loader.go
@@ -19,6 +19,7 @@ package dashboards
import (
"encoding/json"
+ "errors"
"fmt"
"io/ioutil"
"path"
@@ -33,7 +34,7 @@ import (
type ElasticsearchLoader struct {
client *elasticsearch.Client
config *Config
- version string
+ version common.Version
msgOutputter MessageOutputter
}
@@ -48,6 +49,9 @@ func NewElasticsearchLoader(cfg *common.Config, dashboardsConfig *Config, msgOut
}
version := esClient.GetVersion()
+ if !version.IsValid() {
+ return nil, errors.New("No valid Elasticsearch version available")
+ }
loader := ElasticsearchLoader{
client: esClient,
@@ -56,7 +60,7 @@ func NewElasticsearchLoader(cfg *common.Config, dashboardsConfig *Config, msgOut
msgOutputter: msgOutputter,
}
- loader.statusMsg("Initialize the Elasticsearch %s loader", version)
+ loader.statusMsg("Initialize the Elasticsearch %s loader", version.String())
return &loader, nil
}
diff --git a/libbeat/dashboards/es_loader_test.go b/libbeat/dashboards/es_loader_test.go
index 4b9c468a335f..29f5684da90b 100644
--- a/libbeat/dashboards/es_loader_test.go
+++ b/libbeat/dashboards/es_loader_test.go
@@ -20,7 +20,6 @@
package dashboards
import (
- "strings"
"testing"
"github.com/stretchr/testify/assert"
@@ -40,8 +39,9 @@ func TestImporter(t *testing.T) {
}
client := estest.GetTestingElasticsearch(t)
- if strings.HasPrefix(client.Connection.GetVersion(), "6.") ||
- strings.HasPrefix(client.Connection.GetVersion(), "7.") {
+ major := client.GetVersion().Major
+
+ if major == 6 || major == 7 {
t.Skip("Skipping tests for Elasticsearch 6.x releases")
}
@@ -76,8 +76,8 @@ func TestImporterEmptyBeat(t *testing.T) {
}
client := estest.GetTestingElasticsearch(t)
- if strings.HasPrefix(client.Connection.GetVersion(), "6.") ||
- strings.HasPrefix(client.Connection.GetVersion(), "7.") {
+ major := client.GetVersion().Major
+ if major == 6 || major == 7 {
t.Skip("Skipping tests for Elasticsearch 6.x releases")
}
diff --git a/libbeat/dashboards/export.go b/libbeat/dashboards/export.go
index d74a1d84b7b8..cfb3074ae05e 100644
--- a/libbeat/dashboards/export.go
+++ b/libbeat/dashboards/export.go
@@ -84,14 +84,9 @@ func ExportAll(client *kibana.Client, list ListYML) ([]common.MapStr, error) {
}
// SaveToFile creates the required directories if needed and saves dashboard.
-func SaveToFile(dashboard common.MapStr, filename, root, versionStr string) error {
- version, err := common.NewVersion(versionStr)
- if err != nil {
- return err
- }
-
+func SaveToFile(dashboard common.MapStr, filename, root string, version common.Version) error {
dashboardsPath := "_meta/kibana/" + strconv.Itoa(version.Major) + "/dashboard"
- err = generator.CreateDirectories(root, dashboardsPath)
+ err := generator.CreateDirectories(root, dashboardsPath)
if err != nil {
return err
}
diff --git a/libbeat/dashboards/kibana_loader.go b/libbeat/dashboards/kibana_loader.go
index 8a09e72bfaea..6c6e291e4aef 100644
--- a/libbeat/dashboards/kibana_loader.go
+++ b/libbeat/dashboards/kibana_loader.go
@@ -35,7 +35,7 @@ var importAPI = "/api/kibana/dashboards/import"
type KibanaLoader struct {
client *kibana.Client
config *Config
- version string
+ version common.Version
hostname string
msgOutputter MessageOutputter
}
@@ -59,7 +59,8 @@ func NewKibanaLoader(ctx context.Context, cfg *common.Config, dashboardsConfig *
msgOutputter: msgOutputter,
}
- loader.statusMsg("Initialize the Kibana %s loader", client.GetVersion())
+ version := client.GetVersion()
+ loader.statusMsg("Initialize the Kibana %s loader", version.String())
return &loader, nil
}
diff --git a/libbeat/kibana/client.go b/libbeat/kibana/client.go
index fe21af40dcba..115a1bffc299 100644
--- a/libbeat/kibana/client.go
+++ b/libbeat/kibana/client.go
@@ -43,7 +43,7 @@ type Connection struct {
Headers map[string]string
http *http.Client
- version string
+ version common.Version
}
type Client struct {
@@ -146,7 +146,7 @@ func NewClientWithConfig(config *ClientConfig) (*Client, error) {
}
if !config.IgnoreVersion {
- if err = client.SetVersion(); err != nil {
+ if err = client.readVersion(); err != nil {
return nil, fmt.Errorf("fail to get the Kibana version: %v", err)
}
}
@@ -178,7 +178,7 @@ func (conn *Connection) Request(method, extraPath string,
}
if method != "GET" {
- req.Header.Set("kbn-version", conn.version)
+ req.Header.Set("kbn-version", conn.version.String())
}
resp, err := conn.http.Do(req)
@@ -201,7 +201,7 @@ func (conn *Connection) Request(method, extraPath string,
return resp.StatusCode, result, retError
}
-func (client *Client) SetVersion() error {
+func (client *Client) readVersion() error {
type kibanaVersionResponse struct {
Name string `json:"name"`
Version struct {
@@ -221,11 +221,12 @@ func (client *Client) SetVersion() error {
err, truncateString(result))
}
- var kibanaVersion kibanaVersionResponse
- var kibanaVersion5x kibanaVersionResponse5x
+ var versionString string
+ var kibanaVersion kibanaVersionResponse
err = json.Unmarshal(result, &kibanaVersion)
if err != nil {
+ var kibanaVersion5x kibanaVersionResponse5x
// The response returned by /api/status is different in Kibana 5.x than in Kibana 6.x
err5x := json.Unmarshal(result, &kibanaVersion5x)
@@ -234,21 +235,28 @@ func (client *Client) SetVersion() error {
return fmt.Errorf("fail to unmarshal the response from GET %s/api/status. Response: %s. Kibana 5.x status api returns: %v. Kibana 6.x status api returns: %v",
client.Connection.URL, truncateString(result), err5x, err)
}
- client.version = kibanaVersion5x.Version
+ versionString = kibanaVersion5x.Version
} else {
-
- client.version = kibanaVersion.Version.Number
+ versionString = kibanaVersion.Version.Number
if kibanaVersion.Version.Snapshot {
// needed for the tests
- client.version = client.version + "-SNAPSHOT"
+ versionString += "-SNAPSHOT"
}
}
+ version, err := common.NewVersion(versionString)
+ if err != nil {
+ return fmt.Errorf("fail to parse kibana version (%v): %+v", versionString, err)
+ }
+
+ client.version = *version
return nil
}
-func (client *Client) GetVersion() string { return client.version }
+// GetVersion returns the version read from kibana. The version is not set if
+// IgnoreVersion was set when creating the client.
+func (client *Client) GetVersion() common.Version { return client.version }
func (client *Client) ImportJSON(url string, params url.Values, jsonBody map[string]interface{}) error {
diff --git a/libbeat/ml-importer/importer.go b/libbeat/ml-importer/importer.go
index 47b8b93b3aee..892c8dae47ad 100644
--- a/libbeat/ml-importer/importer.go
+++ b/libbeat/ml-importer/importer.go
@@ -56,13 +56,13 @@ type MLConfig struct {
type MLLoader interface {
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
- GetVersion() string
+ GetVersion() common.Version
}
// MLSetupper is a subset of the Kibana client API capable of setting up ML objects.
type MLSetupper interface {
Request(method, path string, params url.Values, headers http.Header, body io.Reader) (int, []byte, error)
- GetVersion() string
+ GetVersion() common.Version
}
// MLResponse stores the relevant parts of the response from Kibana to check for errors.
@@ -125,10 +125,11 @@ func ImportMachineLearningJob(esClient MLLoader, cfg *MLConfig) error {
datafeedURL := fmt.Sprintf(esDataFeedURL, cfg.ID)
if len(cfg.MinVersion) > 0 {
- esVersion, err := common.NewVersion(esClient.GetVersion())
- if err != nil {
- return errors.Errorf("Error parsing ES version: %s: %v", esClient.GetVersion(), err)
+ esVersion := esClient.GetVersion()
+ if !esVersion.IsValid() {
+ return errors.New("Invalid Elasticsearch version")
}
+
minVersion, err := common.NewVersion(cfg.MinVersion)
if err != nil {
return errors.Errorf("Error parsing min_version: %s: %v", minVersion, err)
@@ -136,7 +137,7 @@ func ImportMachineLearningJob(esClient MLLoader, cfg *MLConfig) error {
if esVersion.LessThan(minVersion) {
logp.Debug("machine-learning", "Skipping job %s, because ES version (%s) is smaller than min version (%s)",
- cfg.ID, esVersion, minVersion)
+ cfg.ID, esVersion.String(), minVersion)
return nil
}
}
diff --git a/libbeat/outputs/codec/json/event.go b/libbeat/outputs/codec/json/event.go
index 5ea4b4209246..2f0d2c1217b0 100644
--- a/libbeat/outputs/codec/json/event.go
+++ b/libbeat/outputs/codec/json/event.go
@@ -46,7 +46,7 @@ func makeEvent(index, version string, in *beat.Event) event {
Meta: meta{
Beat: index,
Version: version,
- Type: "doc",
+ Type: "_doc",
Fields: in.Meta,
},
Fields: in.Fields,
diff --git a/libbeat/outputs/codec/json/json_test.go b/libbeat/outputs/codec/json/json_test.go
index 3e2abcfd1846..4788bf4be10d 100644
--- a/libbeat/outputs/codec/json/json_test.go
+++ b/libbeat/outputs/codec/json/json_test.go
@@ -35,7 +35,7 @@ func TestJsonCodec(t *testing.T) {
"default json": testCase{
config: defaultConfig,
in: common.MapStr{"msg": "message"},
- expected: `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"doc","version":"1.2.3"},"msg":"message"}`,
+ expected: `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"_doc","version":"1.2.3"},"msg":"message"}`,
},
"pretty enabled": testCase{
config: config{Pretty: true},
@@ -44,7 +44,7 @@ func TestJsonCodec(t *testing.T) {
"@timestamp": "0001-01-01T00:00:00.000Z",
"@metadata": {
"beat": "test",
- "type": "doc",
+ "type": "_doc",
"version": "1.2.3"
},
"msg": "message"
@@ -53,12 +53,12 @@ func TestJsonCodec(t *testing.T) {
"html escaping enabled": testCase{
config: config{EscapeHTML: true},
in: common.MapStr{"msg": "world"},
- expected: `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"doc","version":"1.2.3"},"msg":"\u003chello\u003eworld\u003c/hello\u003e"}`,
+ expected: `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"_doc","version":"1.2.3"},"msg":"\u003chello\u003eworld\u003c/hello\u003e"}`,
},
"html escaping disabled": testCase{
config: config{EscapeHTML: false},
in: common.MapStr{"msg": "world"},
- expected: `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"doc","version":"1.2.3"},"msg":"world"}`,
+ expected: `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"_doc","version":"1.2.3"},"msg":"world"}`,
},
}
diff --git a/libbeat/outputs/console/console_test.go b/libbeat/outputs/console/console_test.go
index 3c165f950e7a..28f6933757ed 100644
--- a/libbeat/outputs/console/console_test.go
+++ b/libbeat/outputs/console/console_test.go
@@ -82,7 +82,7 @@ func TestConsoleOutput(t *testing.T) {
[]beat.Event{
{Fields: event("field", "value")},
},
- "{\"@timestamp\":\"0001-01-01T00:00:00.000Z\",\"@metadata\":{\"beat\":\"test\",\"type\":\"doc\",\"version\":\"1.2.3\"},\"field\":\"value\"}\n",
+ "{\"@timestamp\":\"0001-01-01T00:00:00.000Z\",\"@metadata\":{\"beat\":\"test\",\"type\":\"_doc\",\"version\":\"1.2.3\"},\"field\":\"value\"}\n",
},
{
"single json event (pretty=true)",
@@ -90,7 +90,7 @@ func TestConsoleOutput(t *testing.T) {
[]beat.Event{
{Fields: event("field", "value")},
},
- "{\n \"@timestamp\": \"0001-01-01T00:00:00.000Z\",\n \"@metadata\": {\n \"beat\": \"test\",\n \"type\": \"doc\",\n \"version\": \"1.2.3\"\n },\n \"field\": \"value\"\n}\n",
+ "{\n \"@timestamp\": \"0001-01-01T00:00:00.000Z\",\n \"@metadata\": {\n \"beat\": \"test\",\n \"type\": \"_doc\",\n \"version\": \"1.2.3\"\n },\n \"field\": \"value\"\n}\n",
},
// TODO: enable test after update fmtstr support to beat.Event
{
diff --git a/libbeat/outputs/elasticsearch/api_integration_test.go b/libbeat/outputs/elasticsearch/api_integration_test.go
index 33294c1012c3..8ac9a0ac233c 100644
--- a/libbeat/outputs/elasticsearch/api_integration_test.go
+++ b/libbeat/outputs/elasticsearch/api_integration_test.go
@@ -24,7 +24,6 @@ import (
"fmt"
"net/http"
"os"
- "strings"
"testing"
"github.com/stretchr/testify/assert"
@@ -105,8 +104,8 @@ func TestIngest(t *testing.T) {
}
client := getTestingElasticsearch(t)
- if strings.HasPrefix(client.Connection.version, "2.") {
- t.Skip("Skipping tests as pipeline not available in 2.x releases")
+ if client.Connection.version.Major < 5 {
+ t.Skip("Skipping tests as pipeline not available in <5.x releases")
}
status, _, err := client.DeletePipeline(pipeline, nil)
diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go
index 83bf3fe83a5d..746b8e782447 100644
--- a/libbeat/outputs/elasticsearch/client.go
+++ b/libbeat/outputs/elasticsearch/client.go
@@ -29,6 +29,7 @@ import (
"time"
"github.com/elastic/beats/libbeat/beat"
+ "github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/outil"
@@ -42,10 +43,11 @@ type Client struct {
Connection
tlsConfig *transport.TLSConfig
- index outil.Selector
- pipeline *outil.Selector
- params map[string]string
- timeout time.Duration
+ index outil.Selector
+ pipeline *outil.Selector
+ params map[string]string
+ timeout time.Duration
+ eventType string
// buffered bulk requests
bulkRequ *bulkRequest
@@ -89,7 +91,7 @@ type Connection struct {
onConnectCallback func() error
encoder bodyEncoder
- version string
+ version common.Version
}
type bulkIndexAction struct {
@@ -130,7 +132,9 @@ var (
)
const (
- eventType = "doc"
+ defaultEventTypeES6 = "doc"
+ defaultEventTypeES7 = "_doc"
+ defaultEventType = defaultEventTypeES7
)
// NewClient instantiates a new client.
@@ -215,6 +219,7 @@ func NewClient(
pipeline: pipeline,
params: params,
timeout: s.Timeout,
+ eventType: defaultEventType,
bulkRequ: bulkRequ,
@@ -302,7 +307,7 @@ func (client *Client) publishEvents(
// events slice
origCount := len(data)
- data = bulkEncodePublishRequest(body, client.index, client.pipeline, data)
+ data = bulkEncodePublishRequest(body, client.index, client.pipeline, client.eventType, data)
newCount := len(data)
if st != nil && origCount > newCount {
st.Dropped(origCount - newCount)
@@ -362,12 +367,13 @@ func bulkEncodePublishRequest(
body bulkWriter,
index outil.Selector,
pipeline *outil.Selector,
+ eventType string,
data []publisher.Event,
) []publisher.Event {
okEvents := data[:0]
for i := range data {
event := &data[i].Content
- meta, err := createEventBulkMeta(index, pipeline, event)
+ meta, err := createEventBulkMeta(index, pipeline, eventType, event)
if err != nil {
logp.Err("Failed to encode event meta data: %s", err)
continue
@@ -384,6 +390,7 @@ func bulkEncodePublishRequest(
func createEventBulkMeta(
indexSel outil.Selector,
pipelineSel *outil.Selector,
+ eventType string,
event *beat.Event,
) (interface{}, error) {
pipeline, err := getPipeline(event, pipelineSel)
@@ -633,8 +640,8 @@ func (client *Client) LoadJSON(path string, json map[string]interface{}) ([]byte
return body, nil
}
-// GetVersion returns the elasticsearch version the client is connected to
-func (client *Client) GetVersion() string {
+// GetVersion returns the elasticsearch version the client is connected to.
+func (client *Client) GetVersion() common.Version {
return client.Connection.version
}
@@ -666,7 +673,7 @@ func (client *Client) Test(d testing.Driver) {
err = client.Connect()
d.Fatal("talk to server", err)
- d.Info("version", client.version)
+ d.Info("version", client.version.String())
})
}
@@ -674,14 +681,40 @@ func (client *Client) String() string {
return "elasticsearch(" + client.Connection.URL + ")"
}
-// Connect connects the client.
+// Connect connects the client. It runs a GET request against the root URL of
+// the configured host, updates the known Elasticsearch version and calls
+// globally configured handlers.
+func (client *Client) Connect() error {
+ err := client.Connection.Connect()
+ if err != nil {
+ return err
+ }
+
+ if client.GetVersion().Major < 7 {
+ client.eventType = defaultEventTypeES6
+ } else {
+ client.eventType = defaultEventType
+ }
+
+ return nil
+}
+
+// Connect connects the client. It runs a GET request against the root URL of
+// the configured host, updates the known Elasticsearch version and calls
+// globally configured handlers.
func (conn *Connection) Connect() error {
- var err error
- conn.version, err = conn.Ping()
+ versionString, err := conn.Ping()
if err != nil {
return err
}
+ if version, err := common.NewVersion(versionString); err != nil {
+ logp.Err("Invalid version from Elasticsearch: %v", versionString)
+ conn.version = common.Version{}
+ } else {
+ conn.version = *version
+ }
+
err = conn.onConnectCallback()
if err != nil {
return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %v", err)
@@ -808,7 +841,9 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error)
return status, obj, err
}
-func (conn *Connection) GetVersion() string {
+// GetVersion returns the elasticsearch version the client is connected to.
+// The version is read and updated on 'Connect'.
+func (conn *Connection) GetVersion() common.Version {
return conn.version
}
diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go
index 9588e5ee1624..8a3a7d5f5b91 100644
--- a/libbeat/outputs/elasticsearch/client_integration_test.go
+++ b/libbeat/outputs/elasticsearch/client_integration_test.go
@@ -21,7 +21,6 @@ package elasticsearch
import (
"math/rand"
- "strings"
"testing"
"time"
@@ -92,8 +91,8 @@ func TestClientPublishEventWithPipeline(t *testing.T) {
client.Delete(index, "", "", nil)
// Check version
- if strings.HasPrefix(client.Connection.version, "2.") {
- t.Skip("Skipping tests as pipeline not available in 2.x releases")
+ if client.Connection.version.Major < 5 {
+ t.Skip("Skipping tests as pipeline not available in <5.x releases")
}
publish := func(event beat.Event) {
@@ -173,8 +172,8 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) {
})
client.Delete(index, "", "", nil)
- if strings.HasPrefix(client.Connection.version, "2.") {
- t.Skip("Skipping tests as pipeline not available in 2.x releases")
+ if client.Connection.version.Major < 5 {
+ t.Skip("Skipping tests as pipeline not available in <5.x releases")
}
publish := func(events ...beat.Event) {
diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go
index dd1d5104a008..83f8834bf722 100644
--- a/libbeat/outputs/elasticsearch/client_test.go
+++ b/libbeat/outputs/elasticsearch/client_test.go
@@ -28,6 +28,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
@@ -36,6 +37,7 @@ import (
"github.com/elastic/beats/libbeat/outputs/outest"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/publisher"
+ "github.com/elastic/beats/libbeat/version"
)
func readStatusItem(in []byte) (int, string, error) {
@@ -381,3 +383,89 @@ func TestAddToURL(t *testing.T) {
assert.Equal(t, url, test.expected)
}
}
+
+type testBulkRecorder struct {
+ data []interface{}
+ inAction bool
+}
+
+func TestBulkEncodeEvents(t *testing.T) {
+ cases := map[string]struct {
+ docType string
+ config common.MapStr
+ events []common.MapStr
+ }{
+ "ES 6.x event": {
+ docType: "doc",
+ config: common.MapStr{},
+ events: []common.MapStr{{"message": "test"}},
+ },
+ "ES 7.x event": {
+ docType: "_doc",
+ config: common.MapStr{},
+ events: []common.MapStr{{"message": "test"}},
+ },
+ }
+
+ for name, test := range cases {
+ test := test
+ t.Run(name, func(t *testing.T) {
+ cfg := common.MustNewConfigFrom(test.config)
+
+ index, pipeline, err := buildSelectors(beat.Info{
+ IndexPrefix: "test",
+ Version: version.GetDefaultVersion(),
+ }, cfg)
+ require.NoError(t, err)
+
+ events := make([]publisher.Event, len(test.events))
+ for i, fields := range test.events {
+ events[i] = publisher.Event{
+ Content: beat.Event{
+ Timestamp: time.Now(),
+ Fields: fields,
+ },
+ }
+ }
+
+ recorder := &testBulkRecorder{}
+
+ encoded := bulkEncodePublishRequest(recorder, index, pipeline, test.docType, events)
+ assert.Equal(t, len(events), len(encoded), "all events should have been encoded")
+ assert.False(t, recorder.inAction, "incomplete bulk")
+
+ // check meta-data for each event
+ for i := 0; i < len(recorder.data); i += 2 {
+ var meta bulkEventMeta
+ switch v := recorder.data[i].(type) {
+ case bulkCreateAction:
+ meta = v.Create
+ case bulkIndexAction:
+ meta = v.Index
+ default:
+ panic("unknown type")
+ }
+
+ assert.NotEqual(t, "", meta.Index)
+ assert.Equal(t, test.docType, meta.DocType)
+ }
+
+ // TODO: customer per test case validation
+ })
+ }
+}
+
+func (r *testBulkRecorder) Add(meta, obj interface{}) error {
+ if r.inAction {
+ panic("can not add a new action if other action is active")
+ }
+
+ r.data = append(r.data, meta, obj)
+ return nil
+}
+
+func (r *testBulkRecorder) AddRaw(raw interface{}) error {
+ r.data = append(r.data)
+ r.inAction = !r.inAction
+ return nil
+}
diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go
index ddc4d925f791..0c9dd08537c8 100644
--- a/libbeat/outputs/elasticsearch/elasticsearch.go
+++ b/libbeat/outputs/elasticsearch/elasticsearch.go
@@ -108,9 +108,9 @@ func makeES(
cfg.SetInt("bulk_max_size", -1, defaultBulkSize)
}
- if !cfg.HasField("index") {
- pattern := fmt.Sprintf("%v-%v-%%{+yyyy.MM.dd}", beat.IndexPrefix, beat.Version)
- cfg.SetString("index", -1, pattern)
+ index, pipeline, err := buildSelectors(beat, cfg)
+ if err != nil {
+ return outputs.Fail(err)
}
config := defaultConfig
@@ -123,36 +123,11 @@ func makeES(
return outputs.Fail(err)
}
- index, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
- Key: "index",
- MultiKey: "indices",
- EnableSingleOnly: true,
- FailEmpty: true,
- })
- if err != nil {
- return outputs.Fail(err)
- }
-
tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
return outputs.Fail(err)
}
- pipelineSel, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
- Key: "pipeline",
- MultiKey: "pipelines",
- EnableSingleOnly: true,
- FailEmpty: false,
- })
- if err != nil {
- return outputs.Fail(err)
- }
-
- var pipeline *outil.Selector
- if !pipelineSel.IsEmpty() {
- pipeline = &pipelineSel
- }
-
proxyURL, err := parseProxyURL(config.ProxyURL)
if err != nil {
return outputs.Fail(err)
@@ -201,6 +176,42 @@ func makeES(
return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients)
}
+func buildSelectors(
+ beat beat.Info,
+ cfg *common.Config,
+) (index outil.Selector, pipeline *outil.Selector, err error) {
+ if !cfg.HasField("index") {
+ pattern := fmt.Sprintf("%v-%v-%%{+yyyy.MM.dd}", beat.IndexPrefix, beat.Version)
+ cfg.SetString("index", -1, pattern)
+ }
+
+ index, err = outil.BuildSelectorFromConfig(cfg, outil.Settings{
+ Key: "index",
+ MultiKey: "indices",
+ EnableSingleOnly: true,
+ FailEmpty: true,
+ })
+ if err != nil {
+ return index, pipeline, err
+ }
+
+ pipelineSel, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
+ Key: "pipeline",
+ MultiKey: "pipelines",
+ EnableSingleOnly: true,
+ FailEmpty: false,
+ })
+ if err != nil {
+ return index, pipeline, err
+ }
+
+ if !pipelineSel.IsEmpty() {
+ pipeline = &pipelineSel
+ }
+
+ return index, pipeline, err
+}
+
// NewConnectedClient creates a new Elasticsearch client based on the given config.
// It uses the NewElasticsearchClients to create a list of clients then returns
// the first from the list that successfully connects.
diff --git a/libbeat/template/load.go b/libbeat/template/load.go
index 157e2c7e915d..34fb5638194d 100644
--- a/libbeat/template/load.go
+++ b/libbeat/template/load.go
@@ -34,7 +34,7 @@ import (
type ESClient interface {
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
- GetVersion() string
+ GetVersion() common.Version
}
type Loader struct {
@@ -79,7 +79,8 @@ func (l *Loader) Load() error {
exists := l.CheckTemplate(templateName)
if !exists || l.config.Overwrite {
- logp.Info("Loading template for Elasticsearch version: %s", l.client.GetVersion())
+ version := l.client.GetVersion()
+ logp.Info("Loading template for Elasticsearch version: %s", version.String())
if l.config.Overwrite {
logp.Info("Existing template will be overwritten, as overwrite is enabled.")
}
diff --git a/libbeat/template/load_integration_test.go b/libbeat/template/load_integration_test.go
index 12d8568c5348..ac4910362d5e 100644
--- a/libbeat/template/load_integration_test.go
+++ b/libbeat/template/load_integration_test.go
@@ -21,10 +21,13 @@ package template
import (
"encoding/json"
+ "fmt"
"path/filepath"
+ "strconv"
"testing"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
@@ -32,6 +35,12 @@ import (
"github.com/elastic/beats/libbeat/version"
)
+type testTemplate struct {
+ t *testing.T
+ client ESClient
+ common.MapStr
+}
+
func TestCheckTemplate(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
if err := client.Connect(); err != nil {
@@ -110,24 +119,6 @@ func TestLoadInvalidTemplate(t *testing.T) {
assert.False(t, loader.CheckTemplate(templateName))
}
-func getTemplate(t *testing.T, client ESClient, templateName string) common.MapStr {
- status, body, err := client.Request("GET", "/_template/"+templateName, "", nil, nil)
- assert.NoError(t, err)
- assert.Equal(t, status, 200)
-
- var response common.MapStr
- err = json.Unmarshal(body, &response)
- assert.NoError(t, err)
-
- return common.MapStr(response[templateName].(map[string]interface{}))
-}
-
-func newConfigFrom(t *testing.T, from interface{}) *common.Config {
- cfg, err := common.NewConfigFrom(from)
- assert.NoError(t, err)
- return cfg
-}
-
// Tests loading the templates for each beat
func TestLoadBeatsTemplate(t *testing.T) {
beats := []string{
@@ -213,13 +204,8 @@ func TestTemplateSettings(t *testing.T) {
// Check that it contains the mapping
templateJSON := getTemplate(t, client, tmpl.GetName())
- val, err := templateJSON.GetValue("settings.index.number_of_shards")
- assert.NoError(t, err)
- assert.Equal(t, val.(string), "1")
-
- val, err = templateJSON.GetValue("mappings.doc._source.enabled")
- assert.NoError(t, err)
- assert.Equal(t, val.(bool), false)
+ assert.Equal(t, 1, templateJSON.NumberOfShards())
+ assert.Equal(t, false, templateJSON.SourceEnabled())
// Delete template again to clean up
client.Request("DELETE", "/_template/"+tmpl.GetName(), "", nil, nil)
@@ -276,8 +262,7 @@ func TestOverwrite(t *testing.T) {
// Overwrite was not enabled, so the first version should still be there
templateJSON := getTemplate(t, client, templateName)
- _, err = templateJSON.GetValue("mappings.doc._source.enabled")
- assert.Error(t, err)
+ assert.Equal(t, true, templateJSON.SourceEnabled())
// Load template again, this time with custom settings AND overwrite: true
config = newConfigFrom(t, TemplateConfig{
@@ -297,9 +282,7 @@ func TestOverwrite(t *testing.T) {
// Overwrite was enabled, so the custom setting should be there
templateJSON = getTemplate(t, client, templateName)
- val, err := templateJSON.GetValue("mappings.doc._source.enabled")
- assert.NoError(t, err)
- assert.Equal(t, val.(bool), false)
+ assert.Equal(t, false, templateJSON.SourceEnabled())
// Delete template again to clean up
client.Request("DELETE", "/_template/"+templateName, "", nil, nil)
@@ -388,3 +371,58 @@ func TestTemplateWithData(t *testing.T) {
// Make sure it was removed
assert.False(t, loader.CheckTemplate(tmpl.GetName()))
}
+
+func newConfigFrom(t *testing.T, from interface{}) *common.Config {
+ cfg, err := common.NewConfigFrom(from)
+ assert.NoError(t, err)
+ return cfg
+}
+
+func getTemplate(t *testing.T, client ESClient, templateName string) testTemplate {
+ status, body, err := client.Request("GET", "/_template/"+templateName, "", nil, nil)
+ assert.NoError(t, err)
+ assert.Equal(t, status, 200)
+
+ var response common.MapStr
+ err = json.Unmarshal(body, &response)
+ assert.NoError(t, err)
+
+ return testTemplate{
+ t: t,
+ client: client,
+ MapStr: common.MapStr(response[templateName].(map[string]interface{})),
+ }
+}
+
+func (tt *testTemplate) SourceEnabled() bool {
+ docType := "_doc"
+ major := tt.client.GetVersion().Major
+ if major < 7 {
+ docType = "doc"
+ }
+
+ key := fmt.Sprintf("mappings.%v._source.enabled", docType)
+
+ // _source.enabled is true if it's missing (default)
+ b, _ := tt.HasKey(key)
+ if !b {
+ return true
+ }
+
+ val, err := tt.GetValue(key)
+ if !assert.NoError(tt.t, err) {
+ doc, _ := json.MarshalIndent(tt.MapStr, "", " ")
+ tt.t.Fatal(fmt.Sprintf("failed to read '%v' in %s", key, doc))
+ }
+
+ return val.(bool)
+}
+
+func (tt *testTemplate) NumberOfShards() int {
+ val, err := tt.GetValue("settings.index.number_of_shards")
+ require.NoError(tt.t, err)
+
+ i, err := strconv.Atoi(val.(string))
+ require.NoError(tt.t, err)
+ return i
+}
diff --git a/libbeat/template/template.go b/libbeat/template/template.go
index 49e274f37276..540077b29d6d 100644
--- a/libbeat/template/template.go
+++ b/libbeat/template/template.go
@@ -51,7 +51,7 @@ type Template struct {
}
// New creates a new template instance
-func New(beatVersion string, beatName string, esVersion string, config TemplateConfig) (*Template, error) {
+func New(beatVersion string, beatName string, esVersion common.Version, config TemplateConfig) (*Template, error) {
bV, err := common.NewVersion(beatVersion)
if err != nil {
return nil, err
@@ -96,20 +96,15 @@ func New(beatVersion string, beatName string, esVersion string, config TemplateC
}
// In case no esVersion is set, it is assumed the same as beat version
- if esVersion == "" {
- esVersion = beatVersion
- }
-
- esV, err := common.NewVersion(esVersion)
- if err != nil {
- return nil, err
+ if !esVersion.IsValid() {
+ esVersion = *bV
}
return &Template{
pattern: pattern,
name: name,
beatVersion: *bV,
- esVersion: *esV,
+ esVersion: esVersion,
config: config,
}, nil
}
@@ -210,20 +205,21 @@ func (t *Template) Generate(properties common.MapStr, dynamicTemplates []common.
indexSettings.Put("number_of_routing_shards", defaultNumberOfRoutingShards)
}
- if t.esVersion.IsMajor(7) {
+ var mappingName string
+ major := t.esVersion.Major
+ switch {
+ case major < 6:
+ mappingName = "_default_"
+ case major == 6:
+ mappingName = "doc"
+ case major >= 7:
+ mappingName = "_doc"
defaultFields = append(defaultFields, "fields.*")
indexSettings.Put("query.default_field", defaultFields)
}
indexSettings.DeepUpdate(t.config.Settings.Index)
- var mappingName string
- if t.esVersion.Major >= 6 {
- mappingName = "doc"
- } else {
- mappingName = "_default_"
- }
-
// Load basic structure
basicStructure := common.MapStr{
"mappings": common.MapStr{
diff --git a/libbeat/template/template_test.go b/libbeat/template/template_test.go
index 09eb50b93a10..701f692737f6 100644
--- a/libbeat/template/template_test.go
+++ b/libbeat/template/template_test.go
@@ -34,7 +34,8 @@ func TestNumberOfRoutingShards(t *testing.T) {
config := TemplateConfig{}
// Test it exists in 6.1
- template, err := New(beatVersion, beatName, "6.1.0", config)
+ ver := common.MustNewVersion("6.1.0")
+ template, err := New(beatVersion, beatName, *ver, config)
assert.NoError(t, err)
data := template.Generate(nil, nil)
@@ -44,7 +45,8 @@ func TestNumberOfRoutingShards(t *testing.T) {
assert.Equal(t, 30, shards.(int))
// Test it does not exist in 6.0
- template, err = New(beatVersion, beatName, "6.0.0", config)
+ ver = common.MustNewVersion("6.0.0")
+ template, err = New(beatVersion, beatName, *ver, config)
assert.NoError(t, err)
data = template.Generate(nil, nil)
@@ -64,7 +66,8 @@ func TestNumberOfRoutingShardsOverwrite(t *testing.T) {
}
// Test it exists in 6.1
- template, err := New(beatVersion, beatName, "6.1.0", config)
+ ver := common.MustNewVersion("6.1.0")
+ template, err := New(beatVersion, beatName, *ver, config)
assert.NoError(t, err)
data := template.Generate(nil, nil)
diff --git a/metricbeat/mb/module/example_test.go b/metricbeat/mb/module/example_test.go
index f50d5c4aa878..806d7c0ff91d 100644
--- a/metricbeat/mb/module/example_test.go
+++ b/metricbeat/mb/module/example_test.go
@@ -86,7 +86,7 @@ func ExampleWrapper() {
// {
// "@metadata": {
// "beat": "noindex",
- // "type": "doc",
+ // "type": "_doc",
// "version": "1.2.3"
// },
// "@timestamp": "2016-05-10T23:27:58.485Z",
diff --git a/metricbeat/tests/system/test_template.py b/metricbeat/tests/system/test_template.py
index 8766ba54ddf5..0f6e09b8dcb8 100644
--- a/metricbeat/tests/system/test_template.py
+++ b/metricbeat/tests/system/test_template.py
@@ -42,7 +42,7 @@ def test_export_template(self):
t = json.loads(template_content)
- properties = t["mappings"]["doc"]["properties"]
+ properties = t["mappings"]["_doc"]["properties"]
# Check libbeat fields
assert properties["@timestamp"] == {"type": "date"}