Skip to content

Commit

Permalink
Add @metadata.version to json events (elastic#5166)
Browse files Browse the repository at this point in the history
* Add @metadata.version to json events

- pass beat.Info into the codecs
- update json encoder to include @metadata.version (for Console, File, Logstash, Redis, Kafka)

* Update Ls docs to use the version field

* Fix test build

* s/second/third/

* Fix metricbeat unit tests
  • Loading branch information
Steffen Siering authored and andrewkroh committed Sep 12, 2017
1 parent 885c2d1 commit 46dbdbd
Show file tree
Hide file tree
Showing 26 changed files with 84 additions and 90 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Changed the hashbang used in the beat helper script from `/bin/bash` to `/usr/bin/env bash`. {pull}5051[5051]
- Changed beat helper script to use `exec` when running the beat. {pull}5051[5051]
- Fix reloader error message to only print on actual error {pull}5066[5066]
- Add @metadata.version to events send to Logstash. {pull}5166[5166]

*Auditbeat*

Expand Down
5 changes: 3 additions & 2 deletions libbeat/docs/gettingstarted.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,14 @@ output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" <1>
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" <1>
document_type => "%{[@metadata][type]}" <2>
}
}
------------------------------------------------------------------------------
<1> `%{[@metadata][beat]}` sets the first part of the index name to the value
of the `beat` metadata field, and `%{+YYYY.MM.dd}` sets the second part of the
of the `beat` metadata field, `%{[@metadata][version]}` sets the second part of
the name to the beat's version, and `%{+YYYY.MM.dd}` sets the third part of the
name to a date based on the Logstash `@timestamp` field. For example:
+{beatname_lc}-2017.03.29+.
<2> `%{[@metadata][type]}` sets the document type based on the value of the `type`
Expand Down
17 changes: 9 additions & 8 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -322,18 +322,18 @@ use in Logstash for indexing and filtering:
...
"@metadata": { <1>
"beat": "{beatname_lc}", <2>
"type": "doc" <3>
"version": "{stack-version}" <3>
"type": "doc" <4>
}
}
------------------------------------------------------------------------------
<1> {beatname_uc} uses the `@metadata` field to send metadata to Logstash. The
contents of the `@metadata` field only exist in Logstash and are not part of any
events sent from Logstash. See the
<1> {beatname_uc} uses the `@metadata` field to send metadata to Logstash. See the
{logstashdoc}/event-dependent-configuration.html#metadata[Logstash documentation]
for more about the `@metadata` field.
<2> The default is {beatname_lc}. To change this value, set the
<<logstash-index,`index`>> option in the {beatname_uc} config file.
<3> The value of `type` is currently hardcoded to `doc`. It was used by previous
<3> The beats current version.
<4> The value of `type` is currently hardcoded to `doc`. It was used by previous
Logstash configs to set the type of the document in Elasticsearch.


Expand All @@ -359,14 +359,15 @@ input {
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" <1>
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" <1>
}
}
------------------------------------------------------------------------------
<1> `%{[@metadata][beat]}` sets the first part of the index name to the value
of the `beat` metadata field, and `%{+YYYY.MM.dd}` sets the second part of the
of the `beat` metadata field, `%{[@metadata][version]}` sets the second part to
the beat's version, and `%{+YYYY.MM.dd}` sets the third part of the
name to a date based on the Logstash `@timestamp` field. For example:
+{beatname_lc}-2017.03.29+.
+{beatname_lc}-{stack-version}-2017.03.29+.

Events indexed into Elasticsearch with the Logstash configuration shown here
will be similar to events directly indexed by Beats into Elasticsearch.
Expand Down
1 change: 1 addition & 0 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
monitoring := monitoring.Default.NewRegistry("xpack.monitoring")

pipeline, err := pipeline.New(
beat,
monitoring,
queueFactory, out, pipeline.Settings{
WaitClose: 0,
Expand Down
7 changes: 4 additions & 3 deletions libbeat/outputs/codec/codec_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package codec
import (
"fmt"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

type Factory func(*common.Config) (Codec, error)
type Factory func(beat.Info, *common.Config) (Codec, error)

type Config struct {
Namespace common.ConfigNamespace `config:",inline"`
Expand All @@ -21,7 +22,7 @@ func RegisterType(name string, gen Factory) {
codecs[name] = gen
}

func CreateEncoder(cfg Config) (Codec, error) {
func CreateEncoder(info beat.Info, cfg Config) (Codec, error) {
// default to json codec
codec := "json"
if name := cfg.Namespace.Name(); name != "" {
Expand All @@ -32,5 +33,5 @@ func CreateEncoder(cfg Config) (Codec, error) {
if factory == nil {
return nil, fmt.Errorf("'%v' output codec is not available", codec)
}
return factory(cfg.Namespace.Config())
return factory(info, cfg.Namespace.Config())
}
2 changes: 1 addition & 1 deletion libbeat/outputs/codec/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Config struct {
}

func init() {
codec.RegisterType("format", func(cfg *common.Config) (codec.Codec, error) {
codec.RegisterType("format", func(_ beat.Info, cfg *common.Config) (codec.Codec, error) {
config := Config{}
if cfg == nil {
return nil, errors.New("empty format codec configuration")
Expand Down
16 changes: 9 additions & 7 deletions libbeat/outputs/codec/json/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ type event struct {

// Meta defines common event metadata to be stored in '@metadata'
type meta struct {
Beat string `struct:"beat"`
Type string `struct:"type"`
Fields map[string]interface{} `struct:",inline"`
Beat string `struct:"beat"`
Type string `struct:"type"`
Version string `struct:"version"`
Fields map[string]interface{} `struct:",inline"`
}

func makeEvent(index string, in *beat.Event) event {
func makeEvent(index, version string, in *beat.Event) event {
return event{
Timestamp: in.Timestamp,
Meta: meta{
Beat: index,
Type: "doc",
Fields: in.Meta,
Beat: index,
Version: version,
Type: "doc",
Fields: in.Meta,
},
Fields: in.Fields,
}
Expand Down
21 changes: 13 additions & 8 deletions libbeat/outputs/codec/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"github.com/elastic/beats/libbeat/outputs/codec"
)

// Encoder for serializing a beat.Event to json.
type Encoder struct {
buf bytes.Buffer
folder *gotype.Iterator
pretty bool
buf bytes.Buffer
folder *gotype.Iterator
pretty bool
version string
}

type config struct {
Expand All @@ -27,20 +29,21 @@ var defaultConfig = config{
}

func init() {
codec.RegisterType("json", func(cfg *common.Config) (codec.Codec, error) {
codec.RegisterType("json", func(info beat.Info, cfg *common.Config) (codec.Codec, error) {
config := defaultConfig
if cfg != nil {
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
}

return New(config.Pretty), nil
return New(config.Pretty, info.Version), nil
})
}

func New(pretty bool) *Encoder {
e := &Encoder{pretty: pretty}
// New creates a new json Encoder.
func New(pretty bool, version string) *Encoder {
e := &Encoder{pretty: pretty, version: version}
e.reset()
return e
}
Expand All @@ -62,9 +65,11 @@ func (e *Encoder) reset() {
}
}

// Encode serializies a beat event to JSON. It adds additional metadata in the
// `@metadata` namespace.
func (e *Encoder) Encode(index string, event *beat.Event) ([]byte, error) {
e.buf.Reset()
err := e.folder.Fold(makeEvent(index, event))
err := e.folder.Fold(makeEvent(index, e.version, event))
if err != nil {
e.reset()
return nil, err
Expand Down
9 changes: 5 additions & 4 deletions libbeat/outputs/codec/json/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
)

func TestJsonCodec(t *testing.T) {
expectedValue := `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"doc"},"msg":"message"}`
expectedValue := `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"doc","version":"1.2.3"},"msg":"message"}`

codec := New(false)
codec := New(false, "1.2.3")
output, err := codec.Encode("test", &beat.Event{Fields: common.MapStr{"msg": "message"}})

if err != nil {
Expand All @@ -27,12 +27,13 @@ func TestJsonWriterPrettyPrint(t *testing.T) {
"@timestamp": "0001-01-01T00:00:00.000Z",
"@metadata": {
"beat": "test",
"type": "doc"
"type": "doc",
"version": "1.2.3"
},
"msg": "message"
}`

codec := New(true)
codec := New(true, "1.2.3")
output, err := codec.Encode("test", &beat.Event{Fields: common.MapStr{"msg": "message"}})

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ func makeConsole(

var enc codec.Codec
if config.Codec.Namespace.IsSet() {
enc, err = codec.CreateEncoder(config.Codec)
enc, err = codec.CreateEncoder(beat, config.Codec)
if err != nil {
return outputs.Fail(err)
}
} else {
enc = json.New(config.Pretty)
enc = json.New(config.Pretty, beat.Version)
}

index := beat.Beat
Expand Down
8 changes: 4 additions & 4 deletions libbeat/outputs/console/console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,19 @@ func TestConsoleOutput(t *testing.T) {
}{
{
"single json event (pretty=false)",
json.New(false),
json.New(false, "1.2.3"),
[]beat.Event{
{Fields: event("field", "value")},
},
"{\"@timestamp\":\"0001-01-01T00:00:00.000Z\",\"@metadata\":{\"beat\":\"test\",\"type\":\"doc\"},\"field\":\"value\"}\n",
"{\"@timestamp\":\"0001-01-01T00:00:00.000Z\",\"@metadata\":{\"beat\":\"test\",\"type\":\"doc\",\"version\":\"1.2.3\"},\"field\":\"value\"}\n",
},
{
"single json event (pretty=true)",
json.New(true),
json.New(true, "1.2.3"),
[]beat.Event{
{Fields: event("field", "value")},
},
"{\n \"@timestamp\": \"0001-01-01T00:00:00.000Z\",\n \"@metadata\": {\n \"beat\": \"test\",\n \"type\": \"doc\"\n },\n \"field\": \"value\"\n}\n",
"{\n \"@timestamp\": \"0001-01-01T00:00:00.000Z\",\n \"@metadata\": {\n \"beat\": \"test\",\n \"type\": \"doc\",\n \"version\": \"1.2.3\"\n },\n \"field\": \"value\"\n}\n",
},
// TODO: enable test after update fmtstr support to beat.Event
{
Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ func makeFileout(
cfg.SetInt("bulk_max_size", -1, -1)

fo := &fileOutput{beat: beat, stats: stats}
if err := fo.init(config); err != nil {
if err := fo.init(beat, config); err != nil {
return outputs.Fail(err)
}

return outputs.Success(-1, 0, fo)
}

func (out *fileOutput) init(config config) error {
func (out *fileOutput) init(beat beat.Info, config config) error {
var err error

out.rotator.Path = config.Path
Expand All @@ -51,7 +51,7 @@ func (out *fileOutput) init(config config) error {
out.rotator.Name = out.beat.Beat
}

enc, err := codec.CreateEncoder(config.Codec)
enc, err := codec.CreateEncoder(beat, config.Codec)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func makeKafka(
return outputs.Fail(err)
}

codec, err := codec.CreateEncoder(config.Codec)
codec, err := codec.CreateEncoder(beat, config.Codec)
if err != nil {
return outputs.Fail(err)
}
Expand Down
4 changes: 3 additions & 1 deletion libbeat/outputs/logstash/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
Expand Down Expand Up @@ -32,6 +33,7 @@ type msgRef struct {
}

func newAsyncClient(
beat beat.Info,
conn *transport.Client,
stats *outputs.Stats,
config *Config,
Expand All @@ -49,7 +51,7 @@ func newAsyncClient(
logp.Warn(`The async Logstash client does not support the "ttl" option`)
}

enc := makeLogstashEventEncoder(config.Index)
enc := makeLogstashEventEncoder(beat, config.Index)

queueSize := config.Pipelining - 1
timeout := config.Timeout
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/logstash/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/outest"
"github.com/elastic/beats/libbeat/outputs/transport"
Expand Down Expand Up @@ -35,7 +36,7 @@ func makeAsyncTestClient(conn *transport.Client) testClientDriver {
config := defaultConfig
config.Timeout = 1 * time.Second
config.Pipelining = 3
client, err := newAsyncClient(conn, nil, &config)
client, err := newAsyncClient(beat.Info{}, conn, nil, &config)
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions libbeat/outputs/logstash/enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"github.com/elastic/beats/libbeat/outputs/codec/json"
)

func makeLogstashEventEncoder(index string) func(interface{}) ([]byte, error) {
enc := json.New(false)
func makeLogstashEventEncoder(info beat.Info, index string) func(interface{}) ([]byte, error) {
enc := json.New(false, info.Version)
return func(event interface{}) ([]byte, error) {
return enc.Encode(index, event.(*beat.Event))
}
Expand Down
30 changes: 0 additions & 30 deletions libbeat/outputs/logstash/event.go

This file was deleted.

4 changes: 2 additions & 2 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func makeLogstash(
}

if config.Pipelining > 0 {
client, err = newAsyncClient(conn, stats, config)
client, err = newAsyncClient(beat, conn, stats, config)
} else {
client, err = newSyncClient(conn, stats, config)
client, err = newSyncClient(beat, conn, stats, config)
}
if err != nil {
return outputs.Fail(err)
Expand Down
Loading

0 comments on commit 46dbdbd

Please sign in to comment.