Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add @metadata.version to json events #5166

Merged
merged 5 commits into from
Sep 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Changed the hashbang used in the beat helper script from `/bin/bash` to `/usr/bin/env bash`. {pull}5051[5051]
- Changed beat helper script to use `exec` when running the beat. {pull}5051[5051]
- Fix reloader error message to only print on actual error {pull}5066[5066]
- Add @metadata.version to events send to Logstash. {pull}5166[5166]

*Auditbeat*

Expand Down
5 changes: 3 additions & 2 deletions libbeat/docs/gettingstarted.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,14 @@ output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" <1>
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" <1>
document_type => "%{[@metadata][type]}" <2>
}
}
------------------------------------------------------------------------------
<1> `%{[@metadata][beat]}` sets the first part of the index name to the value
of the `beat` metadata field, and `%{+YYYY.MM.dd}` sets the second part of the
of the `beat` metadata field, `%{[@metadata][version]}` sets the second part of
the name to the beat's version, and `%{+YYYY.MM.dd}` sets the third part of the
name to a date based on the Logstash `@timestamp` field. For example:
+{beatname_lc}-2017.03.29+.
<2> `%{[@metadata][type]}` sets the document type based on the value of the `type`
Expand Down
17 changes: 9 additions & 8 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -322,18 +322,18 @@ use in Logstash for indexing and filtering:
...
"@metadata": { <1>
"beat": "{beatname_lc}", <2>
"type": "doc" <3>
"version": "{stack-version}" <3>
"type": "doc" <4>
}
}
------------------------------------------------------------------------------
<1> {beatname_uc} uses the `@metadata` field to send metadata to Logstash. The
contents of the `@metadata` field only exist in Logstash and are not part of any
events sent from Logstash. See the
<1> {beatname_uc} uses the `@metadata` field to send metadata to Logstash. See the
{logstashdoc}/event-dependent-configuration.html#metadata[Logstash documentation]
for more about the `@metadata` field.
<2> The default is {beatname_lc}. To change this value, set the
<<logstash-index,`index`>> option in the {beatname_uc} config file.
<3> The value of `type` is currently hardcoded to `doc`. It was used by previous
<3> The beats current version.
<4> The value of `type` is currently hardcoded to `doc`. It was used by previous
Logstash configs to set the type of the document in Elasticsearch.


Expand All @@ -359,14 +359,15 @@ input {
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" <1>
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" <1>
}
}
------------------------------------------------------------------------------
<1> `%{[@metadata][beat]}` sets the first part of the index name to the value
of the `beat` metadata field, and `%{+YYYY.MM.dd}` sets the second part of the
of the `beat` metadata field, `%{[@metadata][version]}` sets the second part to
the beat's version, and `%{+YYYY.MM.dd}` sets the third part of the
name to a date based on the Logstash `@timestamp` field. For example:
+{beatname_lc}-2017.03.29+.
+{beatname_lc}-{stack-version}-2017.03.29+.

Events indexed into Elasticsearch with the Logstash configuration shown here
will be similar to events directly indexed by Beats into Elasticsearch.
Expand Down
1 change: 1 addition & 0 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
monitoring := monitoring.Default.NewRegistry("xpack.monitoring")

pipeline, err := pipeline.New(
beat,
monitoring,
queueFactory, out, pipeline.Settings{
WaitClose: 0,
Expand Down
7 changes: 4 additions & 3 deletions libbeat/outputs/codec/codec_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package codec
import (
"fmt"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

type Factory func(*common.Config) (Codec, error)
type Factory func(beat.Info, *common.Config) (Codec, error)

type Config struct {
Namespace common.ConfigNamespace `config:",inline"`
Expand All @@ -21,7 +22,7 @@ func RegisterType(name string, gen Factory) {
codecs[name] = gen
}

func CreateEncoder(cfg Config) (Codec, error) {
func CreateEncoder(info beat.Info, cfg Config) (Codec, error) {
// default to json codec
codec := "json"
if name := cfg.Namespace.Name(); name != "" {
Expand All @@ -32,5 +33,5 @@ func CreateEncoder(cfg Config) (Codec, error) {
if factory == nil {
return nil, fmt.Errorf("'%v' output codec is not available", codec)
}
return factory(cfg.Namespace.Config())
return factory(info, cfg.Namespace.Config())
}
2 changes: 1 addition & 1 deletion libbeat/outputs/codec/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Config struct {
}

func init() {
codec.RegisterType("format", func(cfg *common.Config) (codec.Codec, error) {
codec.RegisterType("format", func(_ beat.Info, cfg *common.Config) (codec.Codec, error) {
config := Config{}
if cfg == nil {
return nil, errors.New("empty format codec configuration")
Expand Down
16 changes: 9 additions & 7 deletions libbeat/outputs/codec/json/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ type event struct {

// Meta defines common event metadata to be stored in '@metadata'
type meta struct {
Beat string `struct:"beat"`
Type string `struct:"type"`
Fields map[string]interface{} `struct:",inline"`
Beat string `struct:"beat"`
Type string `struct:"type"`
Version string `struct:"version"`
Fields map[string]interface{} `struct:",inline"`
}

func makeEvent(index string, in *beat.Event) event {
func makeEvent(index, version string, in *beat.Event) event {
return event{
Timestamp: in.Timestamp,
Meta: meta{
Beat: index,
Type: "doc",
Fields: in.Meta,
Beat: index,
Version: version,
Type: "doc",
Fields: in.Meta,
},
Fields: in.Fields,
}
Expand Down
21 changes: 13 additions & 8 deletions libbeat/outputs/codec/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"github.com/elastic/beats/libbeat/outputs/codec"
)

// Encoder for serializing a beat.Event to json.
type Encoder struct {
buf bytes.Buffer
folder *gotype.Iterator
pretty bool
buf bytes.Buffer
folder *gotype.Iterator
pretty bool
version string
}

type config struct {
Expand All @@ -27,20 +29,21 @@ var defaultConfig = config{
}

func init() {
codec.RegisterType("json", func(cfg *common.Config) (codec.Codec, error) {
codec.RegisterType("json", func(info beat.Info, cfg *common.Config) (codec.Codec, error) {
config := defaultConfig
if cfg != nil {
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
}

return New(config.Pretty), nil
return New(config.Pretty, info.Version), nil
})
}

func New(pretty bool) *Encoder {
e := &Encoder{pretty: pretty}
// New creates a new json Encoder.
func New(pretty bool, version string) *Encoder {
e := &Encoder{pretty: pretty, version: version}
e.reset()
return e
}
Expand All @@ -62,9 +65,11 @@ func (e *Encoder) reset() {
}
}

// Encode serializies a beat event to JSON. It adds additional metadata in the
// `@metadata` namespace.
func (e *Encoder) Encode(index string, event *beat.Event) ([]byte, error) {
e.buf.Reset()
err := e.folder.Fold(makeEvent(index, event))
err := e.folder.Fold(makeEvent(index, e.version, event))
if err != nil {
e.reset()
return nil, err
Expand Down
9 changes: 5 additions & 4 deletions libbeat/outputs/codec/json/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
)

func TestJsonCodec(t *testing.T) {
expectedValue := `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"doc"},"msg":"message"}`
expectedValue := `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"doc","version":"1.2.3"},"msg":"message"}`

codec := New(false)
codec := New(false, "1.2.3")
output, err := codec.Encode("test", &beat.Event{Fields: common.MapStr{"msg": "message"}})

if err != nil {
Expand All @@ -27,12 +27,13 @@ func TestJsonWriterPrettyPrint(t *testing.T) {
"@timestamp": "0001-01-01T00:00:00.000Z",
"@metadata": {
"beat": "test",
"type": "doc"
"type": "doc",
"version": "1.2.3"
},
"msg": "message"
}`

codec := New(true)
codec := New(true, "1.2.3")
output, err := codec.Encode("test", &beat.Event{Fields: common.MapStr{"msg": "message"}})

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ func makeConsole(

var enc codec.Codec
if config.Codec.Namespace.IsSet() {
enc, err = codec.CreateEncoder(config.Codec)
enc, err = codec.CreateEncoder(beat, config.Codec)
if err != nil {
return outputs.Fail(err)
}
} else {
enc = json.New(config.Pretty)
enc = json.New(config.Pretty, beat.Version)
}

index := beat.Beat
Expand Down
8 changes: 4 additions & 4 deletions libbeat/outputs/console/console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,19 @@ func TestConsoleOutput(t *testing.T) {
}{
{
"single json event (pretty=false)",
json.New(false),
json.New(false, "1.2.3"),
[]beat.Event{
{Fields: event("field", "value")},
},
"{\"@timestamp\":\"0001-01-01T00:00:00.000Z\",\"@metadata\":{\"beat\":\"test\",\"type\":\"doc\"},\"field\":\"value\"}\n",
"{\"@timestamp\":\"0001-01-01T00:00:00.000Z\",\"@metadata\":{\"beat\":\"test\",\"type\":\"doc\",\"version\":\"1.2.3\"},\"field\":\"value\"}\n",
},
{
"single json event (pretty=true)",
json.New(true),
json.New(true, "1.2.3"),
[]beat.Event{
{Fields: event("field", "value")},
},
"{\n \"@timestamp\": \"0001-01-01T00:00:00.000Z\",\n \"@metadata\": {\n \"beat\": \"test\",\n \"type\": \"doc\"\n },\n \"field\": \"value\"\n}\n",
"{\n \"@timestamp\": \"0001-01-01T00:00:00.000Z\",\n \"@metadata\": {\n \"beat\": \"test\",\n \"type\": \"doc\",\n \"version\": \"1.2.3\"\n },\n \"field\": \"value\"\n}\n",
},
// TODO: enable test after update fmtstr support to beat.Event
{
Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ func makeFileout(
cfg.SetInt("bulk_max_size", -1, -1)

fo := &fileOutput{beat: beat, stats: stats}
if err := fo.init(config); err != nil {
if err := fo.init(beat, config); err != nil {
return outputs.Fail(err)
}

return outputs.Success(-1, 0, fo)
}

func (out *fileOutput) init(config config) error {
func (out *fileOutput) init(beat beat.Info, config config) error {
var err error

out.rotator.Path = config.Path
Expand All @@ -51,7 +51,7 @@ func (out *fileOutput) init(config config) error {
out.rotator.Name = out.beat.Beat
}

enc, err := codec.CreateEncoder(config.Codec)
enc, err := codec.CreateEncoder(beat, config.Codec)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func makeKafka(
return outputs.Fail(err)
}

codec, err := codec.CreateEncoder(config.Codec)
codec, err := codec.CreateEncoder(beat, config.Codec)
if err != nil {
return outputs.Fail(err)
}
Expand Down
4 changes: 3 additions & 1 deletion libbeat/outputs/logstash/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
Expand Down Expand Up @@ -32,6 +33,7 @@ type msgRef struct {
}

func newAsyncClient(
beat beat.Info,
conn *transport.Client,
stats *outputs.Stats,
config *Config,
Expand All @@ -49,7 +51,7 @@ func newAsyncClient(
logp.Warn(`The async Logstash client does not support the "ttl" option`)
}

enc := makeLogstashEventEncoder(config.Index)
enc := makeLogstashEventEncoder(beat, config.Index)

queueSize := config.Pipelining - 1
timeout := config.Timeout
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/logstash/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/outest"
"github.com/elastic/beats/libbeat/outputs/transport"
Expand Down Expand Up @@ -35,7 +36,7 @@ func makeAsyncTestClient(conn *transport.Client) testClientDriver {
config := defaultConfig
config.Timeout = 1 * time.Second
config.Pipelining = 3
client, err := newAsyncClient(conn, nil, &config)
client, err := newAsyncClient(beat.Info{}, conn, nil, &config)
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions libbeat/outputs/logstash/enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"github.com/elastic/beats/libbeat/outputs/codec/json"
)

func makeLogstashEventEncoder(index string) func(interface{}) ([]byte, error) {
enc := json.New(false)
func makeLogstashEventEncoder(info beat.Info, index string) func(interface{}) ([]byte, error) {
enc := json.New(false, info.Version)
return func(event interface{}) ([]byte, error) {
return enc.Encode(index, event.(*beat.Event))
}
Expand Down
30 changes: 0 additions & 30 deletions libbeat/outputs/logstash/event.go

This file was deleted.

4 changes: 2 additions & 2 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func makeLogstash(
}

if config.Pipelining > 0 {
client, err = newAsyncClient(conn, stats, config)
client, err = newAsyncClient(beat, conn, stats, config)
} else {
client, err = newSyncClient(conn, stats, config)
client, err = newSyncClient(beat, conn, stats, config)
}
if err != nil {
return outputs.Fail(err)
Expand Down
Loading