Skip to content

Commit

Permalink
Introduce beat version in index and template (#3527)
Browse files Browse the repository at this point in the history
This adds the beat version to the index and the template. The advantage of this is that an index always has the correct template applied based on the beats version. Currently during upgrade it must be waited until the next day. Only then the new template is applied. Also it helps in case different versions of beats are running in parallel. Each beat puts its data into the correct index.

The index name is now `beatname-%{[beat.version]}-%{+yyyy.MM.dd}` by default.

This should not have any affects on the dashboards or Kibana in general as `beatname-*` still applies to all data. Are there some potential side affects of this?
  • Loading branch information
ruflin authored and Steffen Siering committed Feb 21, 2017
1 parent 41f75de commit eb3755d
Show file tree
Hide file tree
Showing 31 changed files with 66 additions and 49 deletions.
4 changes: 3 additions & 1 deletion filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ output.elasticsearch:

# Optional index name. The default is "filebeat" plus date
# and generates [filebeat-]YYYY.MM.DD keys.
#index: "filebeat-%{+yyyy.MM.dd}"
#index: "filebeat-%{[beat.version]}-%{+yyyy.MM.dd}"

# Optional ingest node pipeline. By default no pipeline will be used.
#pipeline: ""
Expand Down Expand Up @@ -494,6 +494,8 @@ output.elasticsearch:
#template.enabled: true

# Template name. By default the template name is filebeat.
# The version of the beat will always be appended to the given name
# so the final name is filebeat-%{[beat.version]}.
#template.name: "filebeat"

# Path to template file
Expand Down
2 changes: 1 addition & 1 deletion filebeat/filebeat.template-es2x.json
Original file line number Diff line number Diff line change
Expand Up @@ -537,5 +537,5 @@
"settings": {
"index.refresh_interval": "5s"
},
"template": "filebeat-*"
"template": "filebeat-6.0.0-alpha1-*"
}
2 changes: 1 addition & 1 deletion filebeat/filebeat.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -458,5 +458,5 @@
"index.mapping.total_fields.limit": 10000,
"index.refresh_interval": "5s"
},
"template": "filebeat-*"
"template": "filebeat-6.0.0-alpha1-*"
}
4 changes: 3 additions & 1 deletion heartbeat/heartbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ output.elasticsearch:

# Optional index name. The default is "heartbeat" plus date
# and generates [heartbeat-]YYYY.MM.DD keys.
#index: "heartbeat-%{+yyyy.MM.dd}"
#index: "heartbeat-%{[beat.version]}-%{+yyyy.MM.dd}"

# Optional ingest node pipeline. By default no pipeline will be used.
#pipeline: ""
Expand Down Expand Up @@ -342,6 +342,8 @@ output.elasticsearch:
#template.enabled: true

# Template name. By default the template name is heartbeat.
# The version of the beat will always be appended to the given name
# so the final name is heartbeat-%{[beat.version]}.
#template.name: "heartbeat"

# Path to template file
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/heartbeat.template-es2x.json
Original file line number Diff line number Diff line change
Expand Up @@ -215,5 +215,5 @@
"settings": {
"index.refresh_interval": "5s"
},
"template": "heartbeat-*"
"template": "heartbeat-6.0.0-alpha1-*"
}
2 changes: 1 addition & 1 deletion heartbeat/heartbeat.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,5 @@
"index.mapping.total_fields.limit": 10000,
"index.refresh_interval": "5s"
},
"template": "heartbeat-*"
"template": "heartbeat-6.0.0-alpha1-*"
}
4 changes: 3 additions & 1 deletion libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ output.elasticsearch:

# Optional index name. The default is "beatname" plus date
# and generates [beatname-]YYYY.MM.DD keys.
#index: "beatname-%{+yyyy.MM.dd}"
#index: "beatname-%{[beat.version]}-%{+yyyy.MM.dd}"

# Optional ingest node pipeline. By default no pipeline will be used.
#pipeline: ""
Expand Down Expand Up @@ -144,6 +144,8 @@ output.elasticsearch:
#template.enabled: true

# Template name. By default the template name is beatname.
# The version of the beat will always be appended to the given name
# so the final name is beatname-%{[beat.version]}.
#template.name: "beatname"

# Path to template file
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type console struct {
codec outputs.Codec
}

func New(_ string, config *common.Config, _ int) (outputs.Outputer, error) {
func New(_ common.BeatInfo, config *common.Config, _ int) (outputs.Outputer, error) {
var unpackedConfig Config
err := config.Unpack(&unpackedConfig)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestOutputLoadTemplate(t *testing.T) {
t.Fatal(err)
}

output, err := New("libbeat", cfg, 0)
output, err := New(common.BeatInfo{Beat: "libbeat"}, cfg, 0)
if err != nil {
t.Fatal(err)
}
Expand All @@ -190,6 +190,9 @@ func TestOutputLoadTemplate(t *testing.T) {
"host": "test-host",
"type": "libbeat",
"message": "Test message from libbeat",
"beat": common.MapStr{
"version": "1.2.3",
},
}}

err = output.PublishEvent(nil, outputs.Options{Guaranteed: true}, event)
Expand Down Expand Up @@ -423,7 +426,7 @@ func connectTestEs(t *testing.T, cfg interface{}) (outputs.BulkOutputer, *Client
t.Fatal(err)
}

output, err := New("libbeat", config, 0)
output, err := New(common.BeatInfo{Beat: "libbeat"}, config, 0)
if err != nil {
t.Fatal(err)
}
Expand Down
14 changes: 7 additions & 7 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

type elasticsearchOutput struct {
index outil.Selector
beatName string
beat common.BeatInfo
pipeline *outil.Selector

mode mode.ConnectionMode
Expand Down Expand Up @@ -54,17 +54,17 @@ var (
)

// New instantiates a new output plugin instance publishing to elasticsearch.
func New(beatName string, cfg *common.Config, topologyExpire int) (outputs.Outputer, error) {
func New(beat common.BeatInfo, cfg *common.Config, topologyExpire int) (outputs.Outputer, error) {
if !cfg.HasField("bulk_max_size") {
cfg.SetInt("bulk_max_size", -1, defaultBulkSize)
}

if !cfg.HasField("index") {
pattern := fmt.Sprintf("%v-%%{+yyyy.MM.dd}", beatName)
pattern := fmt.Sprintf("%v-%v-%%{+yyyy.MM.dd}", beat.Beat, beat.Version)
cfg.SetString("index", -1, pattern)
}

output := &elasticsearchOutput{beatName: beatName}
output := &elasticsearchOutput{beat: beat}
err := output.init(cfg, topologyExpire)
if err != nil {
return nil, err
Expand Down Expand Up @@ -238,13 +238,13 @@ func (out *elasticsearchOutput) readTemplate(config *Template) error {
if config.Enabled {
// Set the defaults that depend on the beat name
if config.Name == "" {
config.Name = out.beatName
config.Name = out.beat.Beat + "-" + out.beat.Version
}
if config.Path == "" {
config.Path = fmt.Sprintf("%s.template.json", out.beatName)
config.Path = fmt.Sprintf("%s.template.json", out.beat.Beat)
}
if config.Versions.Es2x.Path == "" {
config.Versions.Es2x.Path = fmt.Sprintf("%s.template-es2x.json", out.beatName)
config.Versions.Es2x.Path = fmt.Sprintf("%s.template-es2x.json", out.beat.Beat)
}

// Look for the template in the configuration path, if it's not absolute
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) *elasticsear
"template.enabled": false,
})

output := &elasticsearchOutput{beatName: "test"}
output := &elasticsearchOutput{beat: common.BeatInfo{Beat: "test"}}
output.init(config, 10)
return output
}
Expand Down
12 changes: 6 additions & 6 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ func init() {
}

type fileOutput struct {
beatName string
rotator logp.FileRotator
codec outputs.Codec
beat common.BeatInfo
rotator logp.FileRotator
codec outputs.Codec
}

// New instantiates a new file output instance.
func New(beatName string, cfg *common.Config, _ int) (outputs.Outputer, error) {
func New(beat common.BeatInfo, cfg *common.Config, _ int) (outputs.Outputer, error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
Expand All @@ -28,7 +28,7 @@ func New(beatName string, cfg *common.Config, _ int) (outputs.Outputer, error) {
cfg.SetInt("flush_interval", -1, -1)
cfg.SetInt("bulk_max_size", -1, -1)

output := &fileOutput{beatName: beatName}
output := &fileOutput{beat: beat}
if err := output.init(config); err != nil {
return nil, err
}
Expand All @@ -41,7 +41,7 @@ func (out *fileOutput) init(config config) error {
out.rotator.Path = config.Path
out.rotator.Name = config.Filename
if out.rotator.Name == "" {
out.rotator.Name = out.beatName
out.rotator.Name = out.beat.Beat
}

codec, err := outputs.CreateEncoder(config.Codec)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ var (
)

// New instantiates a new kafka output instance.
func New(beatName string, cfg *common.Config, topologyExpire int) (outputs.Outputer, error) {
func New(_ common.BeatInfo, cfg *common.Config, topologyExpire int) (outputs.Outputer, error) {
output := &kafka{}
err := output.init(cfg)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestKafkaPublish(t *testing.T) {
// create output within function scope to guarantee
// output is properly closed between single tests
func() {
tmp, err := New("libbeat", cfg, 0)
tmp, err := New(common.BeatInfo{Beat: "libbeat"}, cfg, 0)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ func init() {
outputs.RegisterOutputPlugin("logstash", new)
}

func new(beatName string, cfg *common.Config, _ int) (outputs.Outputer, error) {
func new(beat common.BeatInfo, cfg *common.Config, _ int) (outputs.Outputer, error) {

if !cfg.HasField("index") {
cfg.SetString("index", -1, beatName)
cfg.SetString("index", -1, beat.Beat)
}

output := &logstash{}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer {
"template.enabled": false,
})

output, err := plugin("libbeat", config, 10)
output, err := plugin(common.BeatInfo{Beat: "libbeat"}, config, 10)
if err != nil {
t.Fatalf("init elasticsearch output plugin failed: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newTestLumberjackOutput(
}

cfg, _ := common.NewConfigFrom(config)
output, err := plugin("", cfg, 0)
output, err := plugin(common.BeatInfo{}, cfg, 0)
if err != nil {
t.Fatalf("init logstash output plugin failed: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type BulkOutputer interface {
}

// Create and initialize the output plugin
type OutputBuilder func(beatName string, config *common.Config, topologyExpire int) (Outputer, error)
type OutputBuilder func(beat common.BeatInfo, config *common.Config, topologyExpire int) (Outputer, error)

// Functions to be exported by a output plugin
type OutputInterface interface {
Expand Down Expand Up @@ -90,7 +90,7 @@ func FindOutputPlugin(name string) OutputBuilder {
}

func InitOutputs(
beatName string,
beat common.BeatInfo,
configs map[string]*common.Config,
topologyExpire int,
) ([]OutputPlugin, error) {
Expand All @@ -106,7 +106,7 @@ func InitOutputs(
continue
}

output, err := plugin(beatName, config, topologyExpire)
output, err := plugin(beat, config, topologyExpire)
if err != nil {
logp.Err("failed to initialize %s plugin as output: %s", name, err)
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type redisOut struct {
mode mode.ConnectionMode
topology
beatName string
beat common.BeatInfo
}

var debugf = logp.MakeDebug("redis")
Expand All @@ -40,8 +40,8 @@ func init() {
outputs.RegisterOutputPlugin("redis", new)
}

func new(beatName string, cfg *common.Config, expireTopo int) (outputs.Outputer, error) {
r := &redisOut{beatName: beatName}
func new(beat common.BeatInfo, cfg *common.Config, expireTopo int) (outputs.Outputer, error) {
r := &redisOut{beat: beat}
if err := r.init(cfg, expireTopo); err != nil {
return nil, err
}
Expand Down Expand Up @@ -80,7 +80,7 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
}
}
if !cfg.HasField("key") {
cfg.SetString("key", -1, r.beatName)
cfg.SetString("key", -1, r.beat.Beat)
}

key, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/redis/redis_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func newRedisTestingOutput(t *testing.T, cfg map[string]interface{}) *redisOut {
t.Fatalf("Failed to unpack topology_expire: %v", err)
}

out, err := plugin("libbeat", config, params.Expire)
out, err := plugin(common.BeatInfo{Beat: "libbeat"}, config, params.Expire)
if err != nil {
t.Fatalf("Failed to initialize redis output: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion libbeat/publisher/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ func (publisher *BeatPublisher) init(
publisher.wsOutput.Init()

if !publisher.disabled {
plugins, err := outputs.InitOutputs(beat.Beat, configs, shipper.TopologyExpire)
plugins, err := outputs.InitOutputs(beat, configs, shipper.TopologyExpire)

if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion libbeat/scripts/generate_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,4 +377,5 @@ def fill_field_properties(args, field, defaults, path):
version_data = yaml.load(file)

with open(target, 'w') as output:
fields_to_es_template(args, fields, output, args.beatname + "-*", version_data['version'])
fields_to_es_template(args, fields, output, args.beatname + "-" +
version_data['version'] + "-*", version_data['version'])
4 changes: 3 additions & 1 deletion metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ output.elasticsearch:

# Optional index name. The default is "metricbeat" plus date
# and generates [metricbeat-]YYYY.MM.DD keys.
#index: "metricbeat-%{+yyyy.MM.dd}"
#index: "metricbeat-%{[beat.version]}-%{+yyyy.MM.dd}"

# Optional ingest node pipeline. By default no pipeline will be used.
#pipeline: ""
Expand Down Expand Up @@ -450,6 +450,8 @@ output.elasticsearch:
#template.enabled: true

# Template name. By default the template name is metricbeat.
# The version of the beat will always be appended to the given name
# so the final name is metricbeat-%{[beat.version]}.
#template.name: "metricbeat"

# Path to template file
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/metricbeat.template-es2x.json
Original file line number Diff line number Diff line change
Expand Up @@ -4186,5 +4186,5 @@
"settings": {
"index.refresh_interval": "5s"
},
"template": "metricbeat-*"
"template": "metricbeat-6.0.0-alpha1-*"
}
2 changes: 1 addition & 1 deletion metricbeat/metricbeat.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -4128,5 +4128,5 @@
"index.mapping.total_fields.limit": 10000,
"index.refresh_interval": "5s"
},
"template": "metricbeat-*"
"template": "metricbeat-6.0.0-alpha1-*"
}
4 changes: 3 additions & 1 deletion packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ output.elasticsearch:

# Optional index name. The default is "packetbeat" plus date
# and generates [packetbeat-]YYYY.MM.DD keys.
#index: "packetbeat-%{+yyyy.MM.dd}"
#index: "packetbeat-%{[beat.version]}-%{+yyyy.MM.dd}"

# Optional ingest node pipeline. By default no pipeline will be used.
#pipeline: ""
Expand Down Expand Up @@ -598,6 +598,8 @@ output.elasticsearch:
#template.enabled: true

# Template name. By default the template name is packetbeat.
# The version of the beat will always be appended to the given name
# so the final name is packetbeat-%{[beat.version]}.
#template.name: "packetbeat"

# Path to template file
Expand Down
Loading

0 comments on commit eb3755d

Please sign in to comment.