From 96d18fec2e98716849de72990238b77944148f37 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Tue, 12 Sep 2017 21:07:28 +0200 Subject: [PATCH] Add @metadata.version to json events (#5166) * Add @metadata.version to json events - pass beat.Info into the codecs - update json encoder to include @metadata.version (for Console, File, Logstash, Redis, Kafka) * Update Ls docs to use the version field * Fix test build * s/second/third/ * Fix metricbeat unit tests (cherry picked from commit 46dbdbd4f59e3c5beb68e2f52a8e72df91b83d3e) --- CHANGELOG.asciidoc | 2 ++ libbeat/docs/gettingstarted.asciidoc | 5 ++-- libbeat/docs/outputconfig.asciidoc | 17 ++++++----- .../report/elasticsearch/elasticsearch.go | 1 + libbeat/outputs/codec/codec_reg.go | 7 +++-- libbeat/outputs/codec/format/format.go | 2 +- libbeat/outputs/codec/json/event.go | 16 +++++----- libbeat/outputs/codec/json/json.go | 21 ++++++++----- libbeat/outputs/codec/json/json_test.go | 9 +++--- libbeat/outputs/console/console.go | 4 +-- libbeat/outputs/console/console_test.go | 8 ++--- libbeat/outputs/fileout/file.go | 6 ++-- libbeat/outputs/kafka/kafka.go | 2 +- libbeat/outputs/logstash/async.go | 4 ++- libbeat/outputs/logstash/async_test.go | 3 +- libbeat/outputs/logstash/enc.go | 4 +-- libbeat/outputs/logstash/event.go | 30 ------------------- libbeat/outputs/logstash/logstash.go | 4 +-- libbeat/outputs/logstash/sync.go | 4 ++- libbeat/outputs/logstash/sync_test.go | 3 +- libbeat/outputs/redis/redis.go | 2 +- libbeat/publisher/pipeline/module.go | 2 +- libbeat/publisher/pipeline/pipeline.go | 4 +++ libbeat/publisher/pipeline/processor.go | 8 ++--- metricbeat/mb/module/example_test.go | 5 ++-- metricbeat/mb/testing/data_generator.go | 2 +- 26 files changed, 85 insertions(+), 90 deletions(-) delete mode 100644 libbeat/outputs/logstash/event.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 619676879f6c..bdb40895ca49 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -64,6 +64,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di *Affecting all Beats* +- Add @metadata.version to events send to Logstash. {pull}5166[5166] + *Auditbeat* - Changed the number of shards in the default configuration to 3. {issue}5095[5095] diff --git a/libbeat/docs/gettingstarted.asciidoc b/libbeat/docs/gettingstarted.asciidoc index 002c0adf4f88..df5db066c533 100644 --- a/libbeat/docs/gettingstarted.asciidoc +++ b/libbeat/docs/gettingstarted.asciidoc @@ -296,13 +296,14 @@ output { elasticsearch { hosts => "localhost:9200" manage_template => false - index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" <1> + index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" <1> document_type => "%{[@metadata][type]}" <2> } } ------------------------------------------------------------------------------ <1> `%{[@metadata][beat]}` sets the first part of the index name to the value -of the `beat` metadata field, and `%{+YYYY.MM.dd}` sets the second part of the +of the `beat` metadata field, `%{[@metadata][version]}` sets the second part of +the name to the beat's version, and `%{+YYYY.MM.dd}` sets the third part of the name to a date based on the Logstash `@timestamp` field. For example: +{beatname_lc}-2017.03.29+. <2> `%{[@metadata][type]}` sets the document type based on the value of the `type` diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index a2443fc8abea..6f7beae0a027 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -325,18 +325,18 @@ use in Logstash for indexing and filtering: ... "@metadata": { <1> "beat": "{beatname_lc}", <2> - "type": "doc" <3> + "version": "{stack-version}" <3> + "type": "doc" <4> } } ------------------------------------------------------------------------------ -<1> {beatname_uc} uses the `@metadata` field to send metadata to Logstash. The -contents of the `@metadata` field only exist in Logstash and are not part of any -events sent from Logstash. See the +<1> {beatname_uc} uses the `@metadata` field to send metadata to Logstash. See the {logstashdoc}/event-dependent-configuration.html#metadata[Logstash documentation] for more about the `@metadata` field. <2> The default is {beatname_lc}. To change this value, set the <> option in the {beatname_uc} config file. -<3> The value of `type` is currently hardcoded to `doc`. It was used by previous +<3> The beats current version. +<4> The value of `type` is currently hardcoded to `doc`. It was used by previous Logstash configs to set the type of the document in Elasticsearch. @@ -362,14 +362,15 @@ input { output { elasticsearch { hosts => ["http://localhost:9200"] - index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" <1> + index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" <1> } } ------------------------------------------------------------------------------ <1> `%{[@metadata][beat]}` sets the first part of the index name to the value -of the `beat` metadata field, and `%{+YYYY.MM.dd}` sets the second part of the +of the `beat` metadata field, `%{[@metadata][version]}` sets the second part to +the beat's version, and `%{+YYYY.MM.dd}` sets the third part of the name to a date based on the Logstash `@timestamp` field. For example: -+{beatname_lc}-2017.03.29+. ++{beatname_lc}-{stack-version}-2017.03.29+. Events indexed into Elasticsearch with the Logstash configuration shown here will be similar to events directly indexed by Beats into Elasticsearch. diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 47bea10463b5..7075f38cc597 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -114,6 +114,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { monitoring := monitoring.Default.NewRegistry("xpack.monitoring") pipeline, err := pipeline.New( + beat, monitoring, queueFactory, out, pipeline.Settings{ WaitClose: 0, diff --git a/libbeat/outputs/codec/codec_reg.go b/libbeat/outputs/codec/codec_reg.go index fa5d0ee4e71e..3512db3dd4c6 100644 --- a/libbeat/outputs/codec/codec_reg.go +++ b/libbeat/outputs/codec/codec_reg.go @@ -3,10 +3,11 @@ package codec import ( "fmt" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" ) -type Factory func(*common.Config) (Codec, error) +type Factory func(beat.Info, *common.Config) (Codec, error) type Config struct { Namespace common.ConfigNamespace `config:",inline"` @@ -21,7 +22,7 @@ func RegisterType(name string, gen Factory) { codecs[name] = gen } -func CreateEncoder(cfg Config) (Codec, error) { +func CreateEncoder(info beat.Info, cfg Config) (Codec, error) { // default to json codec codec := "json" if name := cfg.Namespace.Name(); name != "" { @@ -32,5 +33,5 @@ func CreateEncoder(cfg Config) (Codec, error) { if factory == nil { return nil, fmt.Errorf("'%v' output codec is not available", codec) } - return factory(cfg.Namespace.Config()) + return factory(info, cfg.Namespace.Config()) } diff --git a/libbeat/outputs/codec/format/format.go b/libbeat/outputs/codec/format/format.go index ab346ba9af3c..e26bc269c590 100644 --- a/libbeat/outputs/codec/format/format.go +++ b/libbeat/outputs/codec/format/format.go @@ -18,7 +18,7 @@ type Config struct { } func init() { - codec.RegisterType("format", func(cfg *common.Config) (codec.Codec, error) { + codec.RegisterType("format", func(_ beat.Info, cfg *common.Config) (codec.Codec, error) { config := Config{} if cfg == nil { return nil, errors.New("empty format codec configuration") diff --git a/libbeat/outputs/codec/json/event.go b/libbeat/outputs/codec/json/event.go index 417022630c86..e6905d352c04 100644 --- a/libbeat/outputs/codec/json/event.go +++ b/libbeat/outputs/codec/json/event.go @@ -17,18 +17,20 @@ type event struct { // Meta defines common event metadata to be stored in '@metadata' type meta struct { - Beat string `struct:"beat"` - Type string `struct:"type"` - Fields map[string]interface{} `struct:",inline"` + Beat string `struct:"beat"` + Type string `struct:"type"` + Version string `struct:"version"` + Fields map[string]interface{} `struct:",inline"` } -func makeEvent(index string, in *beat.Event) event { +func makeEvent(index, version string, in *beat.Event) event { return event{ Timestamp: in.Timestamp, Meta: meta{ - Beat: index, - Type: "doc", - Fields: in.Meta, + Beat: index, + Version: version, + Type: "doc", + Fields: in.Meta, }, Fields: in.Fields, } diff --git a/libbeat/outputs/codec/json/json.go b/libbeat/outputs/codec/json/json.go index ea74a024305b..a016cf10b704 100644 --- a/libbeat/outputs/codec/json/json.go +++ b/libbeat/outputs/codec/json/json.go @@ -12,10 +12,12 @@ import ( "github.com/elastic/beats/libbeat/outputs/codec" ) +// Encoder for serializing a beat.Event to json. type Encoder struct { - buf bytes.Buffer - folder *gotype.Iterator - pretty bool + buf bytes.Buffer + folder *gotype.Iterator + pretty bool + version string } type config struct { @@ -27,7 +29,7 @@ var defaultConfig = config{ } func init() { - codec.RegisterType("json", func(cfg *common.Config) (codec.Codec, error) { + codec.RegisterType("json", func(info beat.Info, cfg *common.Config) (codec.Codec, error) { config := defaultConfig if cfg != nil { if err := cfg.Unpack(&config); err != nil { @@ -35,12 +37,13 @@ func init() { } } - return New(config.Pretty), nil + return New(config.Pretty, info.Version), nil }) } -func New(pretty bool) *Encoder { - e := &Encoder{pretty: pretty} +// New creates a new json Encoder. +func New(pretty bool, version string) *Encoder { + e := &Encoder{pretty: pretty, version: version} e.reset() return e } @@ -62,9 +65,11 @@ func (e *Encoder) reset() { } } +// Encode serializies a beat event to JSON. It adds additional metadata in the +// `@metadata` namespace. func (e *Encoder) Encode(index string, event *beat.Event) ([]byte, error) { e.buf.Reset() - err := e.folder.Fold(makeEvent(index, event)) + err := e.folder.Fold(makeEvent(index, e.version, event)) if err != nil { e.reset() return nil, err diff --git a/libbeat/outputs/codec/json/json_test.go b/libbeat/outputs/codec/json/json_test.go index f8f1f97b6564..3e9d07186cad 100644 --- a/libbeat/outputs/codec/json/json_test.go +++ b/libbeat/outputs/codec/json/json_test.go @@ -8,9 +8,9 @@ import ( ) func TestJsonCodec(t *testing.T) { - expectedValue := `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"doc"},"msg":"message"}` + expectedValue := `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"doc","version":"1.2.3"},"msg":"message"}` - codec := New(false) + codec := New(false, "1.2.3") output, err := codec.Encode("test", &beat.Event{Fields: common.MapStr{"msg": "message"}}) if err != nil { @@ -27,12 +27,13 @@ func TestJsonWriterPrettyPrint(t *testing.T) { "@timestamp": "0001-01-01T00:00:00.000Z", "@metadata": { "beat": "test", - "type": "doc" + "type": "doc", + "version": "1.2.3" }, "msg": "message" }` - codec := New(true) + codec := New(true, "1.2.3") output, err := codec.Encode("test", &beat.Event{Fields: common.MapStr{"msg": "message"}}) if err != nil { diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 9ef7e84a90c2..605121535fcd 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -48,12 +48,12 @@ func makeConsole( var enc codec.Codec if config.Codec.Namespace.IsSet() { - enc, err = codec.CreateEncoder(config.Codec) + enc, err = codec.CreateEncoder(beat, config.Codec) if err != nil { return outputs.Fail(err) } } else { - enc = json.New(config.Pretty) + enc = json.New(config.Pretty, beat.Version) } index := beat.Beat diff --git a/libbeat/outputs/console/console_test.go b/libbeat/outputs/console/console_test.go index 6ba0c17dbd75..92bebd619a8b 100644 --- a/libbeat/outputs/console/console_test.go +++ b/libbeat/outputs/console/console_test.go @@ -60,19 +60,19 @@ func TestConsoleOutput(t *testing.T) { }{ { "single json event (pretty=false)", - json.New(false), + json.New(false, "1.2.3"), []beat.Event{ {Fields: event("field", "value")}, }, - "{\"@timestamp\":\"0001-01-01T00:00:00.000Z\",\"@metadata\":{\"beat\":\"test\",\"type\":\"doc\"},\"field\":\"value\"}\n", + "{\"@timestamp\":\"0001-01-01T00:00:00.000Z\",\"@metadata\":{\"beat\":\"test\",\"type\":\"doc\",\"version\":\"1.2.3\"},\"field\":\"value\"}\n", }, { "single json event (pretty=true)", - json.New(true), + json.New(true, "1.2.3"), []beat.Event{ {Fields: event("field", "value")}, }, - "{\n \"@timestamp\": \"0001-01-01T00:00:00.000Z\",\n \"@metadata\": {\n \"beat\": \"test\",\n \"type\": \"doc\"\n },\n \"field\": \"value\"\n}\n", + "{\n \"@timestamp\": \"0001-01-01T00:00:00.000Z\",\n \"@metadata\": {\n \"beat\": \"test\",\n \"type\": \"doc\",\n \"version\": \"1.2.3\"\n },\n \"field\": \"value\"\n}\n", }, // TODO: enable test after update fmtstr support to beat.Event { diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index faa9cc79d7cb..bf569058f399 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -35,14 +35,14 @@ func makeFileout( cfg.SetInt("bulk_max_size", -1, -1) fo := &fileOutput{beat: beat, stats: stats} - if err := fo.init(config); err != nil { + if err := fo.init(beat, config); err != nil { return outputs.Fail(err) } return outputs.Success(-1, 0, fo) } -func (out *fileOutput) init(config config) error { +func (out *fileOutput) init(beat beat.Info, config config) error { var err error out.rotator.Path = config.Path @@ -51,7 +51,7 @@ func (out *fileOutput) init(config config) error { out.rotator.Name = out.beat.Beat } - enc, err := codec.CreateEncoder(config.Codec) + enc, err := codec.CreateEncoder(beat, config.Codec) if err != nil { return err } diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index c7d2cc21bef7..9896e72ef4a2 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -143,7 +143,7 @@ func makeKafka( return outputs.Fail(err) } - codec, err := codec.CreateEncoder(config.Codec) + codec, err := codec.CreateEncoder(beat, config.Codec) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 10c0ed1fe23f..70eb6609f58d 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -4,6 +4,7 @@ import ( "net" "time" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common/atomic" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" @@ -32,6 +33,7 @@ type msgRef struct { } func newAsyncClient( + beat beat.Info, conn *transport.Client, stats *outputs.Stats, config *Config, @@ -49,7 +51,7 @@ func newAsyncClient( logp.Warn(`The async Logstash client does not support the "ttl" option`) } - enc := makeLogstashEventEncoder(config.Index) + enc := makeLogstashEventEncoder(beat, config.Index) queueSize := config.Pipelining - 1 timeout := config.Timeout diff --git a/libbeat/outputs/logstash/async_test.go b/libbeat/outputs/logstash/async_test.go index 4964b187599b..2bd94e244f37 100644 --- a/libbeat/outputs/logstash/async_test.go +++ b/libbeat/outputs/logstash/async_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/outest" "github.com/elastic/beats/libbeat/outputs/transport" @@ -35,7 +36,7 @@ func makeAsyncTestClient(conn *transport.Client) testClientDriver { config := defaultConfig config.Timeout = 1 * time.Second config.Pipelining = 3 - client, err := newAsyncClient(conn, nil, &config) + client, err := newAsyncClient(beat.Info{}, conn, nil, &config) if err != nil { panic(err) } diff --git a/libbeat/outputs/logstash/enc.go b/libbeat/outputs/logstash/enc.go index 28969ad184cc..639633006554 100644 --- a/libbeat/outputs/logstash/enc.go +++ b/libbeat/outputs/logstash/enc.go @@ -5,8 +5,8 @@ import ( "github.com/elastic/beats/libbeat/outputs/codec/json" ) -func makeLogstashEventEncoder(index string) func(interface{}) ([]byte, error) { - enc := json.New(false) +func makeLogstashEventEncoder(info beat.Info, index string) func(interface{}) ([]byte, error) { + enc := json.New(false, info.Version) return func(event interface{}) ([]byte, error) { return enc.Encode(index, event.(*beat.Event)) } diff --git a/libbeat/outputs/logstash/event.go b/libbeat/outputs/logstash/event.go deleted file mode 100644 index ce223fb6c5bb..000000000000 --- a/libbeat/outputs/logstash/event.go +++ /dev/null @@ -1,30 +0,0 @@ -package logstash - -/* -// Event describes the event strucutre for events -// (in-)directly send to logstash -type Event struct { - Timestamp time.Time `struct:"@timestamp"` - Meta Meta `struct:"@metadata"` - Fields common.MapStr `struct:",inline"` -} - -// Meta defines common event metadata to be stored in '@metadata' -type Meta struct { - Beat string `struct:"beat"` - Type string `struct:"type"` - Fields map[string]interface{} `struct:",inline"` -} - -func MakeEvent(index string, event *beat.Event) Event { - return Event{ - Timestamp: event.Timestamp, - Meta: Meta{ - Beat: index, - Type: "doc", - Fields: event.Meta, - }, - Fields: event.Fields, - } -} -*/ diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index d8261586e827..6e519f543b3f 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -60,9 +60,9 @@ func makeLogstash( } if config.Pipelining > 0 { - client, err = newAsyncClient(conn, stats, config) + client, err = newAsyncClient(beat, conn, stats, config) } else { - client, err = newSyncClient(conn, stats, config) + client, err = newSyncClient(beat, conn, stats, config) } if err != nil { return outputs.Fail(err) diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index 4bf2bdbd98b5..08bf181b776e 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -3,6 +3,7 @@ package logstash import ( "time" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/transport" @@ -20,6 +21,7 @@ type syncClient struct { } func newSyncClient( + beat beat.Info, conn *transport.Client, stats *outputs.Stats, config *Config, @@ -38,7 +40,7 @@ func newSyncClient( } var err error - enc := makeLogstashEventEncoder(config.Index) + enc := makeLogstashEventEncoder(beat, config.Index) c.client, err = v2.NewSyncClientWithConn(conn, v2.JSONEncoder(enc), v2.Timeout(config.Timeout), diff --git a/libbeat/outputs/logstash/sync_test.go b/libbeat/outputs/logstash/sync_test.go index 932aa484ceb9..ab89c3e56a7f 100644 --- a/libbeat/outputs/logstash/sync_test.go +++ b/libbeat/outputs/logstash/sync_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/outest" "github.com/elastic/beats/libbeat/outputs/transport" @@ -48,7 +49,7 @@ func makeTestClient(conn *transport.Client) testClientDriver { config := defaultConfig config.Timeout = 1 * time.Second config.TTL = 5 * time.Second - client, err := newSyncClient(conn, nil, &config) + client, err := newSyncClient(beat.Info{}, conn, nil, &config) if err != nil { panic(err) } diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 2277d6dce27e..53df2c834272 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -94,7 +94,7 @@ func makeRedis( clients := make([]outputs.NetworkClient, len(hosts)) for i, host := range hosts { - enc, err := codec.CreateEncoder(config.Codec) + enc, err := codec.CreateEncoder(beat, config.Codec) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index 769d7a0a536f..302d4fa8d8de 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -72,7 +72,7 @@ func Load( return nil, err } - p, err := New(reg, queueBuilder, out, settings) + p, err := New(beatInfo, reg, queueBuilder, out, settings) if err != nil { return nil, err } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 8af8ee3bdf7d..18cafa356cff 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -33,6 +33,8 @@ import ( // Processors in the pipeline are executed in the clients go-routine, before // entering the queue. No filtering/processing will occur on the output side. type Pipeline struct { + beatInfo beat.Info + logger *logp.Logger queue queue.Queue output *outputController @@ -130,6 +132,7 @@ type queueFactory func(queue.Eventer) (queue.Queue, error) // The new pipeline will take ownership of queue and outputs. On Close, the // queue and outputs will be closed. func New( + beat beat.Info, metrics *monitoring.Registry, queueFactory queueFactory, out outputs.Group, @@ -142,6 +145,7 @@ func New( processors := settings.Processors disabledOutput := settings.Disabled p := &Pipeline{ + beatInfo: beat, logger: log, waitCloseMode: settings.WaitCloseMode, waitCloseTimeout: settings.WaitClose, diff --git a/libbeat/publisher/pipeline/processor.go b/libbeat/publisher/pipeline/processor.go index 209150f58219..2c8d3f245fa1 100644 --- a/libbeat/publisher/pipeline/processor.go +++ b/libbeat/publisher/pipeline/processor.go @@ -95,7 +95,7 @@ func (p *Pipeline) newProcessorPipeline( // setup 9: debug print final event (P) if logp.IsDebug("publish") { - processors.add(debugPrintProcessor()) + processors.add(debugPrintProcessor(p.beatInfo)) } // setup 10: drop all events if outputs are disabled (P) @@ -244,17 +244,17 @@ func makeAddFieldsProcessor(name string, fields common.MapStr, copy bool) *proce return newAnnotateProcessor(name, fn) } -func debugPrintProcessor() *processorFn { +func debugPrintProcessor(info beat.Info) *processorFn { // ensure only one go-routine is using the encoder (in case // beat.Client is shared between multiple go-routines by accident) var mux sync.Mutex - encoder := json.New(true) + encoder := json.New(true, info.Version) return newProcessor("debugPrint", func(event *beat.Event) (*beat.Event, error) { mux.Lock() defer mux.Unlock() - b, err := encoder.Encode("", event) + b, err := encoder.Encode(info.Beat, event) if err != nil { return event, nil } diff --git a/metricbeat/mb/module/example_test.go b/metricbeat/mb/module/example_test.go index c90687eff328..750c48f3a5f7 100644 --- a/metricbeat/mb/module/example_test.go +++ b/metricbeat/mb/module/example_test.go @@ -70,7 +70,8 @@ func ExampleWrapper() { // { // "@metadata": { // "beat": "noindex", - // "type": "doc" + // "type": "doc", + // "version": "1.2.3" // }, // "@timestamp": "2016-05-10T23:27:58.485Z", // "fake": { @@ -130,7 +131,7 @@ func ExampleRunner() { } func encodeEvent(event beat.Event) (string, error) { - output, err := json.New(false).Encode("noindex", &event) + output, err := json.New(false, "1.2.3").Encode("noindex", &event) if err != nil { return "", nil } diff --git a/metricbeat/mb/testing/data_generator.go b/metricbeat/mb/testing/data_generator.go index 77ad9922c844..3d01260f186e 100644 --- a/metricbeat/mb/testing/data_generator.go +++ b/metricbeat/mb/testing/data_generator.go @@ -102,7 +102,7 @@ func WriteEventToDataJSON(t testing.TB, fullEvent beat.Event) { } // use json output codec to encode event to json - output, err := json.New(true).Encode("noindex", &fullEvent) + output, err := json.New(true, "1.2.3").Encode("noindex", &fullEvent) if err != nil { t.Fatal(err) }