From 533d5a0a0fdc889130d607e873ad5d13ad0f0f15 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Mon, 19 Jul 2021 12:23:03 +0200 Subject: [PATCH 01/13] Adding functionality to death letter index instead of dropping events. Currently when theres a mapping conflict events are dropped, this commits allows one attempting to index it into a different index instead so it can be re-ingested later vs dropping or stalling ingest. The new code tries to death letter index for a wider rage including read only block(403) which is likely to fail for the death letter index as well. The index currently most reside on the same ES cluster. Changing this, if desired, would be a separate PR. Similair to the format we use to store on the death letter index (original message is ecapsed json in the message field. --- libbeat/outputs/elasticsearch/client.go | 54 +++++++--- .../elasticsearch/client_integration_test.go | 63 +++++++++++ libbeat/outputs/elasticsearch/client_test.go | 101 ++++++++++++++++-- libbeat/outputs/elasticsearch/config.go | 55 +++++++--- .../elasticsearch/death_letter_selector.go | 19 ++++ .../outputs/elasticsearch/elasticsearch.go | 14 ++- 6 files changed, 266 insertions(+), 40 deletions(-) create mode 100644 libbeat/outputs/elasticsearch/death_letter_selector.go diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index b388001aeec..d86ad701e49 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -45,7 +45,8 @@ type Client struct { index outputs.IndexSelector pipeline *outil.Selector - observer outputs.Observer + observer outputs.Observer + NonIndexableAction string log *logp.Logger } @@ -53,16 +54,17 @@ type Client struct { // ClientSettings contains the settings for a client. type ClientSettings struct { eslegclient.ConnectionSettings - Index outputs.IndexSelector - Pipeline *outil.Selector - Observer outputs.Observer + Index outputs.IndexSelector + Pipeline *outil.Selector + Observer outputs.Observer + NonIndexableAction string } type bulkResultStats struct { acked int // number of events ACKed by Elasticsearch duplicates int // number of events failed with `create` due to ID already being indexed fails int // number of failed events (can be retried) - nonIndexable int // number of failed events (not indexable -> must be dropped) + nonIndexable int // number of failed events (not indexable) tooMany int // number of events receiving HTTP 429 Too Many Requests } @@ -123,11 +125,11 @@ func NewClient( } client := &Client{ - conn: *conn, - index: s.Index, - pipeline: pipeline, - - observer: s.Observer, + conn: *conn, + index: s.Index, + pipeline: pipeline, + observer: s.Observer, + NonIndexableAction: s.NonIndexableAction, log: logp.NewLogger("elasticsearch"), } @@ -235,7 +237,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) failedEvents = data stats.fails = len(failedEvents) } else { - failedEvents, stats = bulkCollectPublishFails(client.log, result, data) + failedEvents, stats = bulkCollectPublishFails(client.log, result, data, client.NonIndexableAction) } failed := len(failedEvents) @@ -368,6 +370,7 @@ func bulkCollectPublishFails( log *logp.Logger, result eslegclient.BulkResult, data []publisher.Event, + action string, ) ([]publisher.Event, bulkResultStats) { reader := newJSONReader(result) if err := bulkReadToItems(reader); err != nil { @@ -401,10 +404,31 @@ func bulkCollectPublishFails( if status == http.StatusTooManyRequests { stats.tooMany++ } else { - // hard failure, don't collect - log.Warnf("Cannot index event %#v (status=%v): %s", data[i], status, msg) - stats.nonIndexable++ - continue + // hard failure, apply policy action + result, _ := data[i].Content.Meta.HasKey("deathlettered") + if result { + stats.nonIndexable++ + log.Errorf("Can't deliver to death letter index event %#v (status=%v): %s", data[i], status, msg) + // poison pill - this will clog the pipeline if the underlying failure is non transient. + } else if action == "death_letter_index" { + log.Warnf("Cannot index event %#v (status=%v): %s, trying death letter index", data[i], status, msg) + if data[i].Content.Meta == nil { + data[i].Content.Meta = common.MapStr{ + "deathlettered": true, + } + } else { + _, _ = data[i].Content.Meta.Put("deathlettered", true) + } + data[i].Content.Fields = common.MapStr{ + "message": data[i].Content.Fields.String(), + "error.type": status, + "error.message": string(msg), + } + } else { // drop + stats.nonIndexable++ + log.Warnf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg) + continue + } } } diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 67ccc34b8f7..d1959a5bff4 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -200,6 +200,69 @@ func TestClientPublishEventWithPipeline(t *testing.T) { assert.Equal(t, 1, getCount("testfield:0")) // no pipeline } +func TestClientBulkPublishEventsWithDeathletterIndex(t *testing.T) { + type obj map[string]interface{} + + logp.TestingSetup(logp.WithSelectors("elasticsearch")) + + index := "beat-int-test-dli-index" + deathletterIndex := "beat-int-test-dli-deathletter-index" + + output, client := connectTestEsWithoutStats(t, obj{ + "index": index, + "non_indexable_policy": NonIndexablePolicy{ + Action: "death_letter_index", + Index: deathletterIndex, + }, + }) + client.conn.Delete(index, "", "", nil) + client.conn.Delete(deathletterIndex, "", "", nil) + + err := output.Publish(context.Background(), outest.NewBatch(beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "type": "libbeat", + "message": "Test message 1", + "testfield": 0, + }, + })) + if err != nil { + t.Fatal(err) + } + + batch := outest.NewBatch(beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "type": "libbeat", + "message": "Test message 2", + "testfield": "foo0", + }, + }) + err = output.Publish(context.Background(), batch) + if err == nil { + t.Fatal("Expecting mapping conflict") + } + _, _, err = client.conn.Refresh(deathletterIndex) + if err == nil { + t.Fatal("expecting index to not exist yet") + } + err = output.Publish(context.Background(), batch) + if err != nil { + t.Fatal(err) + } + + _, _, err = client.conn.Refresh(index) + if err != nil { + t.Fatal(err) + } + + _, _, err = client.conn.Refresh(deathletterIndex) + if err != nil { + t.Fatal(err) + } + +} + func TestClientBulkPublishEventsWithPipeline(t *testing.T) { type obj map[string]interface{} diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index bd28fe5850b..0a7dcf31148 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -54,7 +54,7 @@ func TestCollectPublishFailsNone(t *testing.T) { events[i] = publisher.Event{Content: beat.Event{Fields: event}} } - res, _ := bulkCollectPublishFails(logp.L(), response, events) + res, _ := bulkCollectPublishFails(logp.L(), response, events, "drop") assert.Equal(t, 0, len(res)) } @@ -71,12 +71,97 @@ func TestCollectPublishFailMiddle(t *testing.T) { eventFail := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event, eventFail, event} - res, stats := bulkCollectPublishFails(logp.L(), response, events) + res, stats := bulkCollectPublishFails(logp.L(), response, events, "drop") assert.Equal(t, 1, len(res)) if len(res) == 1 { assert.Equal(t, eventFail, res[0]) } - assert.Equal(t, stats, bulkResultStats{acked: 2, fails: 1, tooMany: 1}) + assert.Equal(t, bulkResultStats{acked: 2, fails: 1, tooMany: 1}, stats) +} + +func TestCollectPublishFailDeathLetterQueue(t *testing.T) { + response := []byte(` + { "items": [ + {"create": {"status": 200}}, + {"create": { + "error" : { + "root_cause" : [ + { + "type" : "mapper_parsing_exception", + "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'" + } + ], + "type" : "mapper_parsing_exception", + "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'", + "caused_by" : { + "type" : "illegal_argument_exception", + "reason" : "For input string: \"bar1\"" + } + }, + "status" : 400 + } + }, + {"create": {"status": 200}} + ]} + `) + + event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"bar": 1}}} + eventFail := publisher.Event{Content: beat.Event{Fields: common.MapStr{"bar": "bar1"}}} + events := []publisher.Event{event, eventFail, event} + + res, stats := bulkCollectPublishFails(logp.L(), response, events, "death_letter_index") + assert.Equal(t, 1, len(res)) + if len(res) == 1 { + expected := publisher.Event{ + Content: beat.Event{ + Fields: common.MapStr{ + "message": "{\"bar\":\"bar1\"}", + "error.type": 400, + "error.message": "{\n\t\t\t\"root_cause\" : [\n\t\t\t {\n\t\t\t\t\"type\" : \"mapper_parsing_exception\",\n\t\t\t\t\"reason\" : \"failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'\"\n\t\t\t }\n\t\t\t],\n\t\t\t\"type\" : \"mapper_parsing_exception\",\n\t\t\t\"reason\" : \"failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'\",\n\t\t\t\"caused_by\" : {\n\t\t\t \"type\" : \"illegal_argument_exception\",\n\t\t\t \"reason\" : \"For input string: \\\"bar1\\\"\"\n\t\t\t}\n\t\t }", + }, + Meta: common.MapStr{ + "deathlettered": true, + }, + }, + } + assert.Equal(t, expected, res[0]) + } + assert.Equal(t, bulkResultStats{acked: 2, fails: 1, nonIndexable: 0}, stats) +} + +func TestCollectPublishFailDrop(t *testing.T) { + response := []byte(` + { "items": [ + {"create": {"status": 200}}, + {"create": { + "error" : { + "root_cause" : [ + { + "type" : "mapper_parsing_exception", + "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'" + } + ], + "type" : "mapper_parsing_exception", + "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'", + "caused_by" : { + "type" : "illegal_argument_exception", + "reason" : "For input string: \"bar1\"" + } + }, + "status" : 400 + } + }, + {"create": {"status": 200}} + ]} + `) + + event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"bar": 1}}} + eventFail := publisher.Event{Content: beat.Event{Fields: common.MapStr{"bar": "bar1"}}} + events := []publisher.Event{event, eventFail, event} + + res, stats := bulkCollectPublishFails(logp.L(), response, events, "drop") + assert.Equal(t, 0, len(res)) + assert.Equal(t, bulkResultStats{acked: 2, fails: 0, nonIndexable: 1}, stats) } func TestCollectPublishFailAll(t *testing.T) { @@ -91,7 +176,7 @@ func TestCollectPublishFailAll(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event, event, event} - res, stats := bulkCollectPublishFails(logp.L(), response, events) + res, stats := bulkCollectPublishFails(logp.L(), response, events, "drop") assert.Equal(t, 3, len(res)) assert.Equal(t, events, res) assert.Equal(t, stats, bulkResultStats{fails: 3, tooMany: 3}) @@ -132,7 +217,7 @@ func TestCollectPipelinePublishFail(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event} - res, _ := bulkCollectPublishFails(logp.L(), response, events) + res, _ := bulkCollectPublishFails(logp.L(), response, events, "drop") assert.Equal(t, 1, len(res)) assert.Equal(t, events, res) } @@ -150,7 +235,7 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { events := []publisher.Event{event, event, event} for i := 0; i < b.N; i++ { - res, _ := bulkCollectPublishFails(logp.L(), response, events) + res, _ := bulkCollectPublishFails(logp.L(), response, events, "") if len(res) != 0 { b.Fail() } @@ -171,7 +256,7 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { events := []publisher.Event{event, eventFail, event} for i := 0; i < b.N; i++ { - res, _ := bulkCollectPublishFails(logp.L(), response, events) + res, _ := bulkCollectPublishFails(logp.L(), response, events, "") if len(res) != 1 { b.Fail() } @@ -191,7 +276,7 @@ func BenchmarkCollectPublishFailAll(b *testing.B) { events := []publisher.Event{event, event, event} for i := 0; i < b.N; i++ { - res, _ := bulkCollectPublishFails(logp.L(), response, events) + res, _ := bulkCollectPublishFails(logp.L(), response, events, "drop") if len(res) != 3 { b.Fail() } diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index bf2f7932fba..b3fcf91df4b 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -26,20 +26,21 @@ import ( ) type elasticsearchConfig struct { - Protocol string `config:"protocol"` - Path string `config:"path"` - Params map[string]string `config:"parameters"` - Headers map[string]string `config:"headers"` - Username string `config:"username"` - Password string `config:"password"` - APIKey string `config:"api_key"` - LoadBalance bool `config:"loadbalance"` - CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` - EscapeHTML bool `config:"escape_html"` - Kerberos *kerberos.Config `config:"kerberos"` - BulkMaxSize int `config:"bulk_max_size"` - MaxRetries int `config:"max_retries"` - Backoff Backoff `config:"backoff"` + Protocol string `config:"protocol"` + Path string `config:"path"` + Params map[string]string `config:"parameters"` + Headers map[string]string `config:"headers"` + Username string `config:"username"` + Password string `config:"password"` + APIKey string `config:"api_key"` + LoadBalance bool `config:"loadbalance"` + CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` + EscapeHTML bool `config:"escape_html"` + Kerberos *kerberos.Config `config:"kerberos"` + BulkMaxSize int `config:"bulk_max_size"` + MaxRetries int `config:"max_retries"` + Backoff Backoff `config:"backoff"` + NonIndexablePolicy NonIndexablePolicy `config:"non_indexable_policy"` Transport httpcommon.HTTPTransportSettings `config:",inline"` } @@ -49,6 +50,11 @@ type Backoff struct { Max time.Duration } +type NonIndexablePolicy struct { + Index string + Action string +} + const ( defaultBulkSize = 50 ) @@ -70,6 +76,10 @@ var ( Init: 1 * time.Second, Max: 60 * time.Second, }, + NonIndexablePolicy: NonIndexablePolicy{ + Action: "drop", + Index: "", + }, Transport: httpcommon.DefaultHTTPTransportSettings(), } ) @@ -79,5 +89,22 @@ func (c *elasticsearchConfig) Validate() error { return fmt.Errorf("cannot set both api_key and username/password") } + if !stringInSlice(c.NonIndexablePolicy.Action, []string{"drop", "death_letter_index"}) { + return fmt.Errorf("invalid value for non_indexable_policy.action: %s, supported values are: drop, death_letter_index", c.NonIndexablePolicy.Action) + } + + if c.NonIndexablePolicy.Action == "death_letter_index" && c.NonIndexablePolicy.Index == "" { + return fmt.Errorf("empty or missing value for non_indexable_policy.index while 'non_indexable_policy.action: death_letter_index' is set ") + } + return nil } + +func stringInSlice(str string, list []string) bool { + for _, v := range list { + if v == str { + return true + } + } + return false +} diff --git a/libbeat/outputs/elasticsearch/death_letter_selector.go b/libbeat/outputs/elasticsearch/death_letter_selector.go new file mode 100644 index 00000000000..942d2f9f8bf --- /dev/null +++ b/libbeat/outputs/elasticsearch/death_letter_selector.go @@ -0,0 +1,19 @@ +package elasticsearch + +import ( + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/outputs" +) + +type DeathLetterSelector struct { + Selector outputs.IndexSelector + DeathLetterIndex string +} + +func (d DeathLetterSelector) Select(event *beat.Event) (string, error) { + result, _ := event.Meta.HasKey("deathlettered") + if result { + return d.DeathLetterIndex, nil + } + return d.Selector.Select(event) +} diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 682d0e5a41e..ff62179b512 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -67,6 +67,13 @@ func makeES( params = nil } + if config.NonIndexablePolicy.Action == "death_letter_index" { + index = DeathLetterSelector{ + Selector: index, + DeathLetterIndex: config.NonIndexablePolicy.Index, + } + } + clients := make([]outputs.NetworkClient, len(hosts)) for i, host := range hosts { esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) @@ -90,9 +97,10 @@ func makeES( EscapeHTML: config.EscapeHTML, Transport: config.Transport, }, - Index: index, - Pipeline: pipeline, - Observer: observer, + Index: index, + Pipeline: pipeline, + Observer: observer, + NonIndexableAction: config.NonIndexablePolicy.Action, }, &connectCallbackRegistry) if err != nil { return outputs.Fail(err) From 598adca2425816e45a177843234e8cfb579b8023 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Mon, 19 Jul 2021 22:48:48 +0200 Subject: [PATCH 02/13] Adding license header --- .../elasticsearch/death_letter_selector.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/libbeat/outputs/elasticsearch/death_letter_selector.go b/libbeat/outputs/elasticsearch/death_letter_selector.go index 942d2f9f8bf..142b38b6d7c 100644 --- a/libbeat/outputs/elasticsearch/death_letter_selector.go +++ b/libbeat/outputs/elasticsearch/death_letter_selector.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package elasticsearch import ( From e2b2b514c3feda3f7b23fd7d01e089476d38b664 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Wed, 28 Jul 2021 10:09:58 +0200 Subject: [PATCH 03/13] fixing review comment - bad style assigning nothing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Noémi Ványi --- libbeat/outputs/elasticsearch/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index d86ad701e49..8781c7973fa 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -417,7 +417,7 @@ func bulkCollectPublishFails( "deathlettered": true, } } else { - _, _ = data[i].Content.Meta.Put("deathlettered", true) + data[i].Content.Meta.Put("deathlettered", true) } data[i].Content.Fields = common.MapStr{ "message": data[i].Content.Fields.String(), From 1b94b40842fcd15914fea0769702249574e4e83a Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Wed, 28 Jul 2021 18:30:47 +0200 Subject: [PATCH 04/13] buldCollectPublishFails could be a function of Client so not to have to pass arguments --- libbeat/outputs/elasticsearch/client.go | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 8781c7973fa..8a41cc34b56 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -237,7 +237,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) failedEvents = data stats.fails = len(failedEvents) } else { - failedEvents, stats = bulkCollectPublishFails(client.log, result, data, client.NonIndexableAction) + failedEvents, stats = client.bulkCollectPublishFails(result, data) } failed := len(failedEvents) @@ -366,15 +366,10 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) // to be tried again due to error code returned for that items. If indexing an // event failed due to some error in the event itself (e.g. does not respect mapping), // the event will be dropped. -func bulkCollectPublishFails( - log *logp.Logger, - result eslegclient.BulkResult, - data []publisher.Event, - action string, -) ([]publisher.Event, bulkResultStats) { +func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, data []publisher.Event) ([]publisher.Event, bulkResultStats) { reader := newJSONReader(result) if err := bulkReadToItems(reader); err != nil { - log.Errorf("failed to parse bulk response: %v", err.Error()) + client.log.Errorf("failed to parse bulk response: %v", err.Error()) return nil, bulkResultStats{} } @@ -382,9 +377,9 @@ func bulkCollectPublishFails( failed := data[:0] stats := bulkResultStats{} for i := 0; i < count; i++ { - status, msg, err := bulkReadItemStatus(log, reader) + status, msg, err := bulkReadItemStatus(client.log, reader) if err != nil { - log.Error(err) + client.log.Error(err) return nil, bulkResultStats{} } @@ -408,10 +403,10 @@ func bulkCollectPublishFails( result, _ := data[i].Content.Meta.HasKey("deathlettered") if result { stats.nonIndexable++ - log.Errorf("Can't deliver to death letter index event %#v (status=%v): %s", data[i], status, msg) + client.log.Errorf("Can't deliver to death letter index event %#v (status=%v): %s", data[i], status, msg) // poison pill - this will clog the pipeline if the underlying failure is non transient. - } else if action == "death_letter_index" { - log.Warnf("Cannot index event %#v (status=%v): %s, trying death letter index", data[i], status, msg) + } else if client.NonIndexableAction == "death_letter_index" { + client.log.Warnf("Cannot index event %#v (status=%v): %s, trying death letter index", data[i], status, msg) if data[i].Content.Meta == nil { data[i].Content.Meta = common.MapStr{ "deathlettered": true, @@ -426,13 +421,13 @@ func bulkCollectPublishFails( } } else { // drop stats.nonIndexable++ - log.Warnf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg) + client.log.Warnf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg) continue } } } - log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) + client.log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) stats.fails++ failed = append(failed, data[i]) } From a337a8978a33676f6aef9873810ebcbb651d5004 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Wed, 28 Jul 2021 21:44:34 +0200 Subject: [PATCH 05/13] Making more functions of client to reduce parameter lists --- libbeat/outputs/elasticsearch/client.go | 34 ++---- libbeat/outputs/elasticsearch/client_test.go | 111 ++++++++++++++++-- .../elasticsearch/elasticsearch_test.go | 12 +- 3 files changed, 122 insertions(+), 35 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 8a41cc34b56..7604dd818dc 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -168,6 +168,7 @@ func (client *Client) Clone() *Client { ConnectionSettings: connection, Index: client.index, Pipeline: client.pipeline, + NonIndexableAction: client.NonIndexableAction, }, nil, // XXX: do not pass connection callback? ) @@ -206,7 +207,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) // events slice origCount := len(data) span.Context.SetLabel("events_original", origCount) - data, bulkItems := bulkEncodePublishRequest(client.log, client.conn.GetVersion(), client.index, client.pipeline, data) + data, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), data) newCount := len(data) span.Context.SetLabel("events_encoded", newCount) if st != nil && origCount > newCount { @@ -265,21 +266,14 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) // bulkEncodePublishRequest encodes all bulk requests and returns slice of events // successfully added to the list of bulk items and the list of bulk items. -func bulkEncodePublishRequest( - log *logp.Logger, - version common.Version, - index outputs.IndexSelector, - pipeline *outil.Selector, - data []publisher.Event, -) ([]publisher.Event, []interface{}) { - +func (client *Client) bulkEncodePublishRequest(version common.Version, data []publisher.Event) ([]publisher.Event, []interface{}) { okEvents := data[:0] bulkItems := []interface{}{} for i := range data { event := &data[i].Content - meta, err := createEventBulkMeta(log, version, index, pipeline, event) + meta, err := client.createEventBulkMeta(version, event) if err != nil { - log.Errorf("Failed to encode event meta data: %+v", err) + client.log.Errorf("Failed to encode event meta data: %+v", err) continue } if opType := events.GetOpType(*event); opType == events.OpTypeDelete { @@ -293,25 +287,19 @@ func bulkEncodePublishRequest( return okEvents, bulkItems } -func createEventBulkMeta( - log *logp.Logger, - version common.Version, - indexSel outputs.IndexSelector, - pipelineSel *outil.Selector, - event *beat.Event, -) (interface{}, error) { +func (client *Client) createEventBulkMeta(version common.Version, event *beat.Event) (interface{}, error) { eventType := "" if version.Major < 7 { eventType = defaultEventType } - pipeline, err := getPipeline(event, pipelineSel) + pipeline, err := client.getPipeline(event) if err != nil { err := fmt.Errorf("failed to select pipeline: %v", err) return nil, err } - index, err := indexSel.Select(event) + index, err := client.index.Select(event) if err != nil { err := fmt.Errorf("failed to select event index: %v", err) return nil, err @@ -343,7 +331,7 @@ func createEventBulkMeta( return eslegclient.BulkIndexAction{Index: meta}, nil } -func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) { +func (client *Client) getPipeline(event *beat.Event) (string, error) { if event.Meta != nil { pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline) if err == common.ErrKeyNotFound { @@ -356,8 +344,8 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) return strings.ToLower(pipeline), nil } - if pipelineSel != nil { - return pipelineSel.Select(event) + if client.pipeline != nil { + return client.pipeline.Select(event) } return "", nil } diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 0a7dcf31148..7b7fae9a1d3 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -44,6 +44,14 @@ import ( ) func TestCollectPublishFailsNone(t *testing.T) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(t, err) + N := 100 item := `{"create": {"status": 200}},` response := []byte(`{"items": [` + strings.Repeat(item, N) + `]}`) @@ -54,11 +62,19 @@ func TestCollectPublishFailsNone(t *testing.T) { events[i] = publisher.Event{Content: beat.Event{Fields: event}} } - res, _ := bulkCollectPublishFails(logp.L(), response, events, "drop") + res, _ := client.bulkCollectPublishFails(response, events) assert.Equal(t, 0, len(res)) } func TestCollectPublishFailMiddle(t *testing.T) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(t, err) + response := []byte(` { "items": [ {"create": {"status": 200}}, @@ -71,7 +87,7 @@ func TestCollectPublishFailMiddle(t *testing.T) { eventFail := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event, eventFail, event} - res, stats := bulkCollectPublishFails(logp.L(), response, events, "drop") + res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 1, len(res)) if len(res) == 1 { assert.Equal(t, eventFail, res[0]) @@ -80,6 +96,14 @@ func TestCollectPublishFailMiddle(t *testing.T) { } func TestCollectPublishFailDeathLetterQueue(t *testing.T) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "death_letter_index", + }, + nil, + ) + assert.NoError(t, err) + response := []byte(` { "items": [ {"create": {"status": 200}}, @@ -109,7 +133,7 @@ func TestCollectPublishFailDeathLetterQueue(t *testing.T) { eventFail := publisher.Event{Content: beat.Event{Fields: common.MapStr{"bar": "bar1"}}} events := []publisher.Event{event, eventFail, event} - res, stats := bulkCollectPublishFails(logp.L(), response, events, "death_letter_index") + res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 1, len(res)) if len(res) == 1 { expected := publisher.Event{ @@ -130,6 +154,14 @@ func TestCollectPublishFailDeathLetterQueue(t *testing.T) { } func TestCollectPublishFailDrop(t *testing.T) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(t, err) + response := []byte(` { "items": [ {"create": {"status": 200}}, @@ -159,12 +191,20 @@ func TestCollectPublishFailDrop(t *testing.T) { eventFail := publisher.Event{Content: beat.Event{Fields: common.MapStr{"bar": "bar1"}}} events := []publisher.Event{event, eventFail, event} - res, stats := bulkCollectPublishFails(logp.L(), response, events, "drop") + res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 0, len(res)) assert.Equal(t, bulkResultStats{acked: 2, fails: 0, nonIndexable: 1}, stats) } func TestCollectPublishFailAll(t *testing.T) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(t, err) + response := []byte(` { "items": [ {"create": {"status": 429, "error": "ups"}}, @@ -176,7 +216,7 @@ func TestCollectPublishFailAll(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event, event, event} - res, stats := bulkCollectPublishFails(logp.L(), response, events, "drop") + res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 3, len(res)) assert.Equal(t, events, res) assert.Equal(t, stats, bulkResultStats{fails: 3, tooMany: 3}) @@ -185,6 +225,14 @@ func TestCollectPublishFailAll(t *testing.T) { func TestCollectPipelinePublishFail(t *testing.T) { logp.TestingSetup(logp.WithSelectors("elasticsearch")) + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(t, err) + response := []byte(`{ "took": 0, "ingest_took": 0, "errors": true, "items": [ @@ -217,12 +265,20 @@ func TestCollectPipelinePublishFail(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event} - res, _ := bulkCollectPublishFails(logp.L(), response, events, "drop") + res, _ := client.bulkCollectPublishFails(response, events) assert.Equal(t, 1, len(res)) assert.Equal(t, events, res) } func BenchmarkCollectPublishFailsNone(b *testing.B) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(b, err) + response := []byte(` { "items": [ {"create": {"status": 200}}, @@ -235,7 +291,7 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { events := []publisher.Event{event, event, event} for i := 0; i < b.N; i++ { - res, _ := bulkCollectPublishFails(logp.L(), response, events, "") + res, _ := client.bulkCollectPublishFails(response, events) if len(res) != 0 { b.Fail() } @@ -243,6 +299,14 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { } func BenchmarkCollectPublishFailMiddle(b *testing.B) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(b, err) + response := []byte(` { "items": [ {"create": {"status": 200}}, @@ -256,7 +320,7 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { events := []publisher.Event{event, eventFail, event} for i := 0; i < b.N; i++ { - res, _ := bulkCollectPublishFails(logp.L(), response, events, "") + res, _ := client.bulkCollectPublishFails(response, events) if len(res) != 1 { b.Fail() } @@ -264,6 +328,14 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { } func BenchmarkCollectPublishFailAll(b *testing.B) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(b, err) + response := []byte(` { "items": [ {"creatMiddlee": {"status": 429, "error": "ups"}}, @@ -276,7 +348,7 @@ func BenchmarkCollectPublishFailAll(b *testing.B) { events := []publisher.Event{event, event, event} for i := 0; i < b.N; i++ { - res, _ := bulkCollectPublishFails(logp.L(), response, events, "drop") + res, _ := client.bulkCollectPublishFails(response, events) if len(res) != 3 { b.Fail() } @@ -380,7 +452,16 @@ func TestBulkEncodeEvents(t *testing.T) { } } - encoded, bulkItems := bulkEncodePublishRequest(logp.L(), *common.MustNewVersion(test.version), index, pipeline, events) + client, err := NewClient( + ClientSettings{ + Index: index, + Pipeline: pipeline, + }, + nil, + ) + assert.NoError(t, err) + + encoded, bulkItems := client.bulkEncodePublishRequest(*common.MustNewVersion(test.version), events) assert.Equal(t, len(events), len(encoded), "all events should have been encoded") assert.Equal(t, 2*len(events), len(bulkItems), "incomplete bulk") @@ -446,7 +527,15 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { } } - encoded, bulkItems := bulkEncodePublishRequest(logp.L(), *common.MustNewVersion(version.GetDefaultVersion()), index, pipeline, events) + client, err := NewClient( + ClientSettings{ + Index: index, + Pipeline: pipeline, + }, + nil, + ) + + encoded, bulkItems := client.bulkEncodePublishRequest(*common.MustNewVersion(version.GetDefaultVersion()), events) require.Equal(t, len(events)-1, len(encoded), "all events should have been encoded") require.Equal(t, 9, len(bulkItems), "incomplete bulk") diff --git a/libbeat/outputs/elasticsearch/elasticsearch_test.go b/libbeat/outputs/elasticsearch/elasticsearch_test.go index df757d570dd..0e1e72ab9cc 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch_test.go +++ b/libbeat/outputs/elasticsearch/elasticsearch_test.go @@ -19,6 +19,7 @@ package elasticsearch import ( "fmt" + "github.com/stretchr/testify/assert" "testing" "github.com/elastic/beats/v7/libbeat/beat" @@ -116,11 +117,20 @@ func TestPipelineSelection(t *testing.T) { for name, test := range cases { t.Run(name, func(t *testing.T) { selector, err := buildPipelineSelector(common.MustNewConfigFrom(test.cfg)) + + client, err := NewClient( + ClientSettings{ + Pipeline: &selector, + }, + nil, + ) + assert.NoError(t, err) + if err != nil { t.Fatalf("Failed to parse configuration: %v", err) } - got, err := getPipeline(&test.event, &selector) + got, err := client.getPipeline(&test.event) if err != nil { t.Fatalf("Failed to create pipeline name: %v", err) } From cd2280f8e6ea28a12a50ac95f93619d0bf41d081 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Thu, 29 Jul 2021 01:13:53 +0200 Subject: [PATCH 06/13] - death letter -> dead letter :-) - using constants over string literals - Improved config as suggested by review - added test for config --- libbeat/outputs/elasticsearch/client.go | 12 +- .../elasticsearch/client_integration_test.go | 14 +-- libbeat/outputs/elasticsearch/client_test.go | 6 +- libbeat/outputs/elasticsearch/config.go | 57 +++------- libbeat/outputs/elasticsearch/config_test.go | 104 ++++++++++++++++++ .../elasticsearch/death_letter_selector.go | 12 +- .../outputs/elasticsearch/elasticsearch.go | 16 ++- .../elasticsearch/non_indexable_policy.go | 100 +++++++++++++++++ 8 files changed, 253 insertions(+), 68 deletions(-) create mode 100644 libbeat/outputs/elasticsearch/config_test.go create mode 100644 libbeat/outputs/elasticsearch/non_indexable_policy.go diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 7604dd818dc..318678007c4 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -388,19 +388,19 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat stats.tooMany++ } else { // hard failure, apply policy action - result, _ := data[i].Content.Meta.HasKey("deathlettered") + result, _ := data[i].Content.Meta.HasKey(dead_letter_marker_field) if result { stats.nonIndexable++ - client.log.Errorf("Can't deliver to death letter index event %#v (status=%v): %s", data[i], status, msg) + client.log.Errorf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg) // poison pill - this will clog the pipeline if the underlying failure is non transient. - } else if client.NonIndexableAction == "death_letter_index" { - client.log.Warnf("Cannot index event %#v (status=%v): %s, trying death letter index", data[i], status, msg) + } else if client.NonIndexableAction == dead_letter_index { + client.log.Warnf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg) if data[i].Content.Meta == nil { data[i].Content.Meta = common.MapStr{ - "deathlettered": true, + dead_letter_marker_field: true, } } else { - data[i].Content.Meta.Put("deathlettered", true) + data[i].Content.Meta.Put(dead_letter_marker_field, true) } data[i].Content.Fields = common.MapStr{ "message": data[i].Content.Fields.String(), diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index d1959a5bff4..a505b10921a 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -200,23 +200,23 @@ func TestClientPublishEventWithPipeline(t *testing.T) { assert.Equal(t, 1, getCount("testfield:0")) // no pipeline } -func TestClientBulkPublishEventsWithDeathletterIndex(t *testing.T) { +func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) { type obj map[string]interface{} logp.TestingSetup(logp.WithSelectors("elasticsearch")) index := "beat-int-test-dli-index" - deathletterIndex := "beat-int-test-dli-deathletter-index" + deadletterIndex := "beat-int-test-dli-dead-letter-index" output, client := connectTestEsWithoutStats(t, obj{ "index": index, "non_indexable_policy": NonIndexablePolicy{ - Action: "death_letter_index", - Index: deathletterIndex, + Action: "dead_letter_index", + Index: deadletterIndex, }, }) client.conn.Delete(index, "", "", nil) - client.conn.Delete(deathletterIndex, "", "", nil) + client.conn.Delete(deadletterIndex, "", "", nil) err := output.Publish(context.Background(), outest.NewBatch(beat.Event{ Timestamp: time.Now(), @@ -242,7 +242,7 @@ func TestClientBulkPublishEventsWithDeathletterIndex(t *testing.T) { if err == nil { t.Fatal("Expecting mapping conflict") } - _, _, err = client.conn.Refresh(deathletterIndex) + _, _, err = client.conn.Refresh(deadletterIndex) if err == nil { t.Fatal("expecting index to not exist yet") } @@ -256,7 +256,7 @@ func TestClientBulkPublishEventsWithDeathletterIndex(t *testing.T) { t.Fatal(err) } - _, _, err = client.conn.Refresh(deathletterIndex) + _, _, err = client.conn.Refresh(deadletterIndex) if err != nil { t.Fatal(err) } diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 7b7fae9a1d3..02be8038c40 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -95,10 +95,10 @@ func TestCollectPublishFailMiddle(t *testing.T) { assert.Equal(t, bulkResultStats{acked: 2, fails: 1, tooMany: 1}, stats) } -func TestCollectPublishFailDeathLetterQueue(t *testing.T) { +func TestCollectPublishFailDeadLetterQueue(t *testing.T) { client, err := NewClient( ClientSettings{ - NonIndexableAction: "death_letter_index", + NonIndexableAction: "dead_letter_index", }, nil, ) @@ -144,7 +144,7 @@ func TestCollectPublishFailDeathLetterQueue(t *testing.T) { "error.message": "{\n\t\t\t\"root_cause\" : [\n\t\t\t {\n\t\t\t\t\"type\" : \"mapper_parsing_exception\",\n\t\t\t\t\"reason\" : \"failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'\"\n\t\t\t }\n\t\t\t],\n\t\t\t\"type\" : \"mapper_parsing_exception\",\n\t\t\t\"reason\" : \"failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'\",\n\t\t\t\"caused_by\" : {\n\t\t\t \"type\" : \"illegal_argument_exception\",\n\t\t\t \"reason\" : \"For input string: \\\"bar1\\\"\"\n\t\t\t}\n\t\t }", }, Meta: common.MapStr{ - "deathlettered": true, + dead_letter_marker_field: true, }, }, } diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index b3fcf91df4b..7c8645d7d0d 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -19,6 +19,7 @@ package elasticsearch import ( "fmt" + "github.com/elastic/beats/v7/libbeat/common" "time" "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" @@ -26,21 +27,21 @@ import ( ) type elasticsearchConfig struct { - Protocol string `config:"protocol"` - Path string `config:"path"` - Params map[string]string `config:"parameters"` - Headers map[string]string `config:"headers"` - Username string `config:"username"` - Password string `config:"password"` - APIKey string `config:"api_key"` - LoadBalance bool `config:"loadbalance"` - CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` - EscapeHTML bool `config:"escape_html"` - Kerberos *kerberos.Config `config:"kerberos"` - BulkMaxSize int `config:"bulk_max_size"` - MaxRetries int `config:"max_retries"` - Backoff Backoff `config:"backoff"` - NonIndexablePolicy NonIndexablePolicy `config:"non_indexable_policy"` + Protocol string `config:"protocol"` + Path string `config:"path"` + Params map[string]string `config:"parameters"` + Headers map[string]string `config:"headers"` + Username string `config:"username"` + Password string `config:"password"` + APIKey string `config:"api_key"` + LoadBalance bool `config:"loadbalance"` + CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` + EscapeHTML bool `config:"escape_html"` + Kerberos *kerberos.Config `config:"kerberos"` + BulkMaxSize int `config:"bulk_max_size"` + MaxRetries int `config:"max_retries"` + Backoff Backoff `config:"backoff"` + NonIndexablePolicy *common.ConfigNamespace `config:"non_indexable_policy"` Transport httpcommon.HTTPTransportSettings `config:",inline"` } @@ -50,11 +51,6 @@ type Backoff struct { Max time.Duration } -type NonIndexablePolicy struct { - Index string - Action string -} - const ( defaultBulkSize = 50 ) @@ -76,10 +72,6 @@ var ( Init: 1 * time.Second, Max: 60 * time.Second, }, - NonIndexablePolicy: NonIndexablePolicy{ - Action: "drop", - Index: "", - }, Transport: httpcommon.DefaultHTTPTransportSettings(), } ) @@ -89,22 +81,5 @@ func (c *elasticsearchConfig) Validate() error { return fmt.Errorf("cannot set both api_key and username/password") } - if !stringInSlice(c.NonIndexablePolicy.Action, []string{"drop", "death_letter_index"}) { - return fmt.Errorf("invalid value for non_indexable_policy.action: %s, supported values are: drop, death_letter_index", c.NonIndexablePolicy.Action) - } - - if c.NonIndexablePolicy.Action == "death_letter_index" && c.NonIndexablePolicy.Index == "" { - return fmt.Errorf("empty or missing value for non_indexable_policy.index while 'non_indexable_policy.action: death_letter_index' is set ") - } - return nil } - -func stringInSlice(str string, list []string) bool { - for _, v := range list { - if v == str { - return true - } - } - return false -} diff --git a/libbeat/outputs/elasticsearch/config_test.go b/libbeat/outputs/elasticsearch/config_test.go new file mode 100644 index 00000000000..a73b4404edc --- /dev/null +++ b/libbeat/outputs/elasticsearch/config_test.go @@ -0,0 +1,104 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch + +import ( + "github.com/elastic/beats/v7/libbeat/common" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestValidDropPolicyConfig(t *testing.T) { + config := ` +non_indexable_policy.drop: ~ +` + c := common.MustNewConfigFrom(config) + elasticsearchOutputConfig, err := readConfig(c) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + policy, err := newNonIndexablePolicy(elasticsearchOutputConfig.NonIndexablePolicy) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + assert.Equal(t, drop, policy.action(), "action should be drop") +} + +func TestDeadLetterIndexPolicyConfig(t *testing.T) { + config := ` +non_indexable_policy.dead_letter_index: + index: "my-dead-letter-index" +` + c := common.MustNewConfigFrom(config) + elasticsearchOutputConfig, err := readConfig(c) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + policy, err := newNonIndexablePolicy(elasticsearchOutputConfig.NonIndexablePolicy) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + assert.Equal(t, "my-dead-letter-index", policy.index(), "index should match config") +} + +func TestInvalidNonIndexablePolicyConfig(t *testing.T) { + tests := map[string]string{ + "non_indexable_policy with invalid policy": ` +non_indexable_policy.juggle: ~ +`, + "dead_Letter_index policy without properties": ` +non_indexable_policy.dead_letter_index: ~ +`, + "dead_Letter_index policy without index": ` +non_indexable_policy.dead_letter_index: + foo: "bar" +`, + "dead_Letter_index policy nil index": ` +non_indexable_policy.dead_letter_index: + index: ~ +`, + "dead_Letter_index policy empty index": ` +non_indexable_policy.dead_letter_index: + index: "" +`, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + c := common.MustNewConfigFrom(test) + elasticsearchOutputConfig, err := readConfig(c) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + _, err = newNonIndexablePolicy(elasticsearchOutputConfig.NonIndexablePolicy) + if err == nil { + t.Fatalf("Can create test configuration from invalid input") + } + t.Logf("error %s", err.Error()) + }) + } +} + +func readConfig(cfg *common.Config) (*elasticsearchConfig, error) { + c := defaultConfig + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + return &c, nil +} diff --git a/libbeat/outputs/elasticsearch/death_letter_selector.go b/libbeat/outputs/elasticsearch/death_letter_selector.go index 142b38b6d7c..02bd3780cab 100644 --- a/libbeat/outputs/elasticsearch/death_letter_selector.go +++ b/libbeat/outputs/elasticsearch/death_letter_selector.go @@ -22,15 +22,15 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs" ) -type DeathLetterSelector struct { - Selector outputs.IndexSelector - DeathLetterIndex string +type DeadLetterSelector struct { + Selector outputs.IndexSelector + DeadLetterIndex string } -func (d DeathLetterSelector) Select(event *beat.Event) (string, error) { - result, _ := event.Meta.HasKey("deathlettered") +func (d DeadLetterSelector) Select(event *beat.Event) (string, error) { + result, _ := event.Meta.HasKey(dead_letter_marker_field) if result { - return d.DeathLetterIndex, nil + return d.DeadLetterIndex, nil } return d.Selector.Select(event) } diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index ff62179b512..417043eff83 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -53,6 +53,12 @@ func makeES( return outputs.Fail(err) } + policy, err := newNonIndexablePolicy(config.NonIndexablePolicy) + if err != nil { + log.Errorf("error while creating file identifier: %v", err) + return outputs.Fail(err) + } + hosts, err := outputs.ReadHostList(cfg) if err != nil { return outputs.Fail(err) @@ -67,10 +73,10 @@ func makeES( params = nil } - if config.NonIndexablePolicy.Action == "death_letter_index" { - index = DeathLetterSelector{ - Selector: index, - DeathLetterIndex: config.NonIndexablePolicy.Index, + if policy.action() == dead_letter_index { + index = DeadLetterSelector{ + Selector: index, + DeadLetterIndex: policy.index(), } } @@ -100,7 +106,7 @@ func makeES( Index: index, Pipeline: pipeline, Observer: observer, - NonIndexableAction: config.NonIndexablePolicy.Action, + NonIndexableAction: policy.action(), }, &connectCallbackRegistry) if err != nil { return outputs.Fail(err) diff --git a/libbeat/outputs/elasticsearch/non_indexable_policy.go b/libbeat/outputs/elasticsearch/non_indexable_policy.go new file mode 100644 index 00000000000..318eba9b0af --- /dev/null +++ b/libbeat/outputs/elasticsearch/non_indexable_policy.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch + +import ( + "fmt" + "github.com/elastic/beats/v7/libbeat/common" +) + +const ( + dead_letter_marker_field = "deadlettered" + drop = "drop" + dead_letter_index = "dead_letter_index" +) + +type DropPolicy struct{} + +func (d DropPolicy) action() string { + return drop +} + +func (d DropPolicy) index() string { + panic("drop policy doesn't have an target index") +} + +type DeadLetterIndexPolicy struct { + Index string +} + +func (d DeadLetterIndexPolicy) action() string { + return dead_letter_index +} + +func (d DeadLetterIndexPolicy) index() string { + return d.Index +} + +type nonIndexablePolicy interface { + action() string + index() string +} + +var ( + policyFactories = map[string]policyFactory{ + drop: newDropPolicy, + dead_letter_index: newDeadLetterIndexPolicy, + } +) + +func newDeadLetterIndexPolicy(config *common.Config) (nonIndexablePolicy, error) { + policy := DeadLetterIndexPolicy{} + err := config.Unpack(&policy) + if policy.index() == "" { + return nil, fmt.Errorf("%s policy requires an `index` to be specified specified", dead_letter_index) + } + return policy, err +} + +func newDropPolicy(*common.Config) (nonIndexablePolicy, error) { + return defaultDropPolicy(), nil +} + +func defaultPolicy() nonIndexablePolicy { + return defaultDropPolicy() +} + +func defaultDropPolicy() nonIndexablePolicy { + return &DropPolicy{} +} + +type policyFactory func(config *common.Config) (nonIndexablePolicy, error) + +func newNonIndexablePolicy(configNamespace *common.ConfigNamespace) (nonIndexablePolicy, error) { + if configNamespace == nil { + return defaultPolicy(), nil + } + + policyType := configNamespace.Name() + factory, ok := policyFactories[policyType] + if !ok { + return nil, fmt.Errorf("no such policy type: %s", policyType) + } + + return factory(configNamespace.Config()) +} From 2d9fe0a9be5d2766bef172c6210bdd9afdf00c0a Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Thu, 29 Jul 2021 03:37:43 +0200 Subject: [PATCH 07/13] fmt fix --- libbeat/outputs/elasticsearch/config.go | 3 ++- libbeat/outputs/elasticsearch/config_test.go | 6 ++++-- libbeat/outputs/elasticsearch/elasticsearch_test.go | 3 ++- libbeat/outputs/elasticsearch/non_indexable_policy.go | 1 + 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index 7c8645d7d0d..816078f9225 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -19,9 +19,10 @@ package elasticsearch import ( "fmt" - "github.com/elastic/beats/v7/libbeat/common" "time" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/common/transport/kerberos" ) diff --git a/libbeat/outputs/elasticsearch/config_test.go b/libbeat/outputs/elasticsearch/config_test.go index a73b4404edc..15d934a1e86 100644 --- a/libbeat/outputs/elasticsearch/config_test.go +++ b/libbeat/outputs/elasticsearch/config_test.go @@ -18,9 +18,11 @@ package elasticsearch import ( - "github.com/elastic/beats/v7/libbeat/common" - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" ) func TestValidDropPolicyConfig(t *testing.T) { diff --git a/libbeat/outputs/elasticsearch/elasticsearch_test.go b/libbeat/outputs/elasticsearch/elasticsearch_test.go index 0e1e72ab9cc..f84d32c2ccd 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch_test.go +++ b/libbeat/outputs/elasticsearch/elasticsearch_test.go @@ -19,9 +19,10 @@ package elasticsearch import ( "fmt" - "github.com/stretchr/testify/assert" "testing" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" diff --git a/libbeat/outputs/elasticsearch/non_indexable_policy.go b/libbeat/outputs/elasticsearch/non_indexable_policy.go index 318eba9b0af..fa97c5176d9 100644 --- a/libbeat/outputs/elasticsearch/non_indexable_policy.go +++ b/libbeat/outputs/elasticsearch/non_indexable_policy.go @@ -19,6 +19,7 @@ package elasticsearch import ( "fmt" + "github.com/elastic/beats/v7/libbeat/common" ) From 5776371805d0f65ed44a55b11fe5267b8ece9a9d Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Thu, 29 Jul 2021 04:25:56 +0200 Subject: [PATCH 08/13] Adding documentation --- .../elasticsearch/docs/elasticsearch.asciidoc | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc index 80b2bb36879..ae324cf4243 100644 --- a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc +++ b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc @@ -682,3 +682,35 @@ or <> for more information. Configuration options for Kerberos authentication. See <> for more information. + +===== `non_indexable_policy` + +Specifies the behavior when the elasticsearch cluster explicitly rejects documents, for example on mapping conflicts. + +====== `drop` +The default behaviour, when an event is explicitly rejected by elasticsearch it is dropped. + +["source","yaml"] +------------------------------------------------------------------------------ +output.elasticsearch: + hosts: ["http://localhost:9200"] + non_indexable_policy.drop: ~ +------------------------------------------------------------------------------ + +====== `dead_letter_index` +On an explicit rejection, this policy will retry the event in the next batch. However, the target index will change +to index specified. In addition, the structure of the event will be change to the following fields: + +message:: Contains the escaped json of the original event. +error.type:: Contains the status code +error.message:: Contains status returned by elasticsearch, describing the reason + +`index`:: The index to send rejected events to. + +["source","yaml"] +------------------------------------------------------------------------------ +output.elasticsearch: + hosts: ["http://localhost:9200"] + non_indexable_policy.dead_letter_index: + index: "my-dead-letter-index" +------------------------------------------------------------------------------ From 8e0881eaa7cbd4fa00234aacbf80e3daa1653d4d Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Thu, 29 Jul 2021 04:29:38 +0200 Subject: [PATCH 09/13] Amending CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c7a8b46c4d3..1f6976c293a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -619,6 +619,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Ensure common proxy settings support in HTTP clients: proxy_disabled, proxy_url, proxy_headers and typical environment variables HTTP_PROXY, HTTPS_PROXY, NOPROXY. {pull}25219[25219] - `add_process_metadata` processor enrich process information with owner name and id. {issue}21068[21068] {pull}21111[21111] - Add proxy support for AWS functions. {pull}26832[26832] +- Added policies to the elasticsearch output for non indexible events {pull}26952[26952] *Auditbeat* From 0e4bc4d9e2493f93518c62a41ea07c577e4036fd Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Fri, 30 Jul 2021 01:11:53 +0200 Subject: [PATCH 10/13] fixing integration test --- libbeat/outputs/elasticsearch/client_integration_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index a505b10921a..048bcc24c8e 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -210,9 +210,8 @@ func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) { output, client := connectTestEsWithoutStats(t, obj{ "index": index, - "non_indexable_policy": NonIndexablePolicy{ - Action: "dead_letter_index", - Index: deadletterIndex, + "non_indexable_policy": DeadLetterIndexPolicy{ + Index: deadletterIndex, }, }) client.conn.Delete(index, "", "", nil) From 6fa60b238ed03974ded1589007752094de711c20 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Fri, 30 Jul 2021 04:08:40 +0200 Subject: [PATCH 11/13] fixing integration test --- libbeat/outputs/elasticsearch/client_integration_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 048bcc24c8e..2ad6a1c79b5 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -210,8 +210,10 @@ func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) { output, client := connectTestEsWithoutStats(t, obj{ "index": index, - "non_indexable_policy": DeadLetterIndexPolicy{ - Index: deadletterIndex, + "non_indexable_policy": map[string]interface{}{ + "dead_letter_index": map[string]interface{}{ + "index": deadletterIndex, + }, }, }) client.conn.Delete(index, "", "", nil) From 08bee4eaead56796f30a894ae9407689ea7aeb5b Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Mon, 2 Aug 2021 16:27:02 +0200 Subject: [PATCH 12/13] new feature, making it beta with cfgwarn.Beta and beta[] in the documentation. --- libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc | 3 +++ libbeat/outputs/elasticsearch/non_indexable_policy.go | 2 ++ 2 files changed, 5 insertions(+) diff --git a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc index ae324cf4243..ebe22854bee 100644 --- a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc +++ b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc @@ -698,6 +698,9 @@ output.elasticsearch: ------------------------------------------------------------------------------ ====== `dead_letter_index` + +beta[] + On an explicit rejection, this policy will retry the event in the next batch. However, the target index will change to index specified. In addition, the structure of the event will be change to the following fields: diff --git a/libbeat/outputs/elasticsearch/non_indexable_policy.go b/libbeat/outputs/elasticsearch/non_indexable_policy.go index fa97c5176d9..2e781765a79 100644 --- a/libbeat/outputs/elasticsearch/non_indexable_policy.go +++ b/libbeat/outputs/elasticsearch/non_indexable_policy.go @@ -19,6 +19,7 @@ package elasticsearch import ( "fmt" + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/common" ) @@ -64,6 +65,7 @@ var ( ) func newDeadLetterIndexPolicy(config *common.Config) (nonIndexablePolicy, error) { + cfgwarn.Beta("The non_indexable_policy dead_letter_index is beta.") policy := DeadLetterIndexPolicy{} err := config.Unpack(&policy) if policy.index() == "" { From 0304845f50d130886c7595109612a8a0fdf91550 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Mon, 2 Aug 2021 17:42:10 +0200 Subject: [PATCH 13/13] fixing lint - intellij must have the wrong settings --- libbeat/outputs/elasticsearch/non_indexable_policy.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libbeat/outputs/elasticsearch/non_indexable_policy.go b/libbeat/outputs/elasticsearch/non_indexable_policy.go index 2e781765a79..8602fdb1245 100644 --- a/libbeat/outputs/elasticsearch/non_indexable_policy.go +++ b/libbeat/outputs/elasticsearch/non_indexable_policy.go @@ -19,6 +19,7 @@ package elasticsearch import ( "fmt" + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/common"