From d18dc645d81906b4be0a40772562426540f04394 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Mon, 2 Aug 2021 20:20:05 +0200 Subject: [PATCH] Non indexable policy: death letter index (#26952) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adding functionality to death letter index instead of dropping events. When there's a mapping conflict events are dropped, this commits allows one attempting to index it into a different index instead so it can be re-ingested later vs dropping or stalling ingest. The index currently must reside on the same ES cluster. * buildCollectPublishFails and other functions changed to be a function of Client so not to have to pass arguments Co-authored-by: Noémi Ványi (cherry picked from commit 60cf09dcfc133027ff2fd0f9caf223705693a79d) --- CHANGELOG.next.asciidoc | 2 + libbeat/outputs/elasticsearch/client.go | 101 ++++----- .../elasticsearch/client_integration_test.go | 64 ++++++ libbeat/outputs/elasticsearch/client_test.go | 194 +++++++++++++++++- libbeat/outputs/elasticsearch/config.go | 31 +-- libbeat/outputs/elasticsearch/config_test.go | 106 ++++++++++ .../elasticsearch/death_letter_selector.go | 36 ++++ .../elasticsearch/docs/elasticsearch.asciidoc | 35 ++++ .../outputs/elasticsearch/elasticsearch.go | 20 +- .../elasticsearch/elasticsearch_test.go | 13 +- .../elasticsearch/non_indexable_policy.go | 104 ++++++++++ 11 files changed, 631 insertions(+), 75 deletions(-) create mode 100644 libbeat/outputs/elasticsearch/config_test.go create mode 100644 libbeat/outputs/elasticsearch/death_letter_selector.go create mode 100644 libbeat/outputs/elasticsearch/non_indexable_policy.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9c54c476c19..dc0029103b5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -469,6 +469,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Libbeat: report beat version to monitoring. {pull}26214[26214] - Ensure common proxy settings support in HTTP clients: proxy_disabled, proxy_url, proxy_headers and typical environment variables HTTP_PROXY, HTTPS_PROXY, NOPROXY. {pull}25219[25219] - Add proxy support for AWS functions. {pull}26832[26832] +- Added policies to the elasticsearch output for non indexible events {pull}26952[26952] +- Add sha256 digests to RPM packages. {issue}23670[23670] *Auditbeat* diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index b388001aeec..318678007c4 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -45,7 +45,8 @@ type Client struct { index outputs.IndexSelector pipeline *outil.Selector - observer outputs.Observer + observer outputs.Observer + NonIndexableAction string log *logp.Logger } @@ -53,16 +54,17 @@ type Client struct { // ClientSettings contains the settings for a client. type ClientSettings struct { eslegclient.ConnectionSettings - Index outputs.IndexSelector - Pipeline *outil.Selector - Observer outputs.Observer + Index outputs.IndexSelector + Pipeline *outil.Selector + Observer outputs.Observer + NonIndexableAction string } type bulkResultStats struct { acked int // number of events ACKed by Elasticsearch duplicates int // number of events failed with `create` due to ID already being indexed fails int // number of failed events (can be retried) - nonIndexable int // number of failed events (not indexable -> must be dropped) + nonIndexable int // number of failed events (not indexable) tooMany int // number of events receiving HTTP 429 Too Many Requests } @@ -123,11 +125,11 @@ func NewClient( } client := &Client{ - conn: *conn, - index: s.Index, - pipeline: pipeline, - - observer: s.Observer, + conn: *conn, + index: s.Index, + pipeline: pipeline, + observer: s.Observer, + NonIndexableAction: s.NonIndexableAction, log: logp.NewLogger("elasticsearch"), } @@ -166,6 +168,7 @@ func (client *Client) Clone() *Client { ConnectionSettings: connection, Index: client.index, Pipeline: client.pipeline, + NonIndexableAction: client.NonIndexableAction, }, nil, // XXX: do not pass connection callback? ) @@ -204,7 +207,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) // events slice origCount := len(data) span.Context.SetLabel("events_original", origCount) - data, bulkItems := bulkEncodePublishRequest(client.log, client.conn.GetVersion(), client.index, client.pipeline, data) + data, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), data) newCount := len(data) span.Context.SetLabel("events_encoded", newCount) if st != nil && origCount > newCount { @@ -235,7 +238,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) failedEvents = data stats.fails = len(failedEvents) } else { - failedEvents, stats = bulkCollectPublishFails(client.log, result, data) + failedEvents, stats = client.bulkCollectPublishFails(result, data) } failed := len(failedEvents) @@ -263,21 +266,14 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) // bulkEncodePublishRequest encodes all bulk requests and returns slice of events // successfully added to the list of bulk items and the list of bulk items. -func bulkEncodePublishRequest( - log *logp.Logger, - version common.Version, - index outputs.IndexSelector, - pipeline *outil.Selector, - data []publisher.Event, -) ([]publisher.Event, []interface{}) { - +func (client *Client) bulkEncodePublishRequest(version common.Version, data []publisher.Event) ([]publisher.Event, []interface{}) { okEvents := data[:0] bulkItems := []interface{}{} for i := range data { event := &data[i].Content - meta, err := createEventBulkMeta(log, version, index, pipeline, event) + meta, err := client.createEventBulkMeta(version, event) if err != nil { - log.Errorf("Failed to encode event meta data: %+v", err) + client.log.Errorf("Failed to encode event meta data: %+v", err) continue } if opType := events.GetOpType(*event); opType == events.OpTypeDelete { @@ -291,25 +287,19 @@ func bulkEncodePublishRequest( return okEvents, bulkItems } -func createEventBulkMeta( - log *logp.Logger, - version common.Version, - indexSel outputs.IndexSelector, - pipelineSel *outil.Selector, - event *beat.Event, -) (interface{}, error) { +func (client *Client) createEventBulkMeta(version common.Version, event *beat.Event) (interface{}, error) { eventType := "" if version.Major < 7 { eventType = defaultEventType } - pipeline, err := getPipeline(event, pipelineSel) + pipeline, err := client.getPipeline(event) if err != nil { err := fmt.Errorf("failed to select pipeline: %v", err) return nil, err } - index, err := indexSel.Select(event) + index, err := client.index.Select(event) if err != nil { err := fmt.Errorf("failed to select event index: %v", err) return nil, err @@ -341,7 +331,7 @@ func createEventBulkMeta( return eslegclient.BulkIndexAction{Index: meta}, nil } -func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) { +func (client *Client) getPipeline(event *beat.Event) (string, error) { if event.Meta != nil { pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline) if err == common.ErrKeyNotFound { @@ -354,8 +344,8 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) return strings.ToLower(pipeline), nil } - if pipelineSel != nil { - return pipelineSel.Select(event) + if client.pipeline != nil { + return client.pipeline.Select(event) } return "", nil } @@ -364,14 +354,10 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) // to be tried again due to error code returned for that items. If indexing an // event failed due to some error in the event itself (e.g. does not respect mapping), // the event will be dropped. -func bulkCollectPublishFails( - log *logp.Logger, - result eslegclient.BulkResult, - data []publisher.Event, -) ([]publisher.Event, bulkResultStats) { +func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, data []publisher.Event) ([]publisher.Event, bulkResultStats) { reader := newJSONReader(result) if err := bulkReadToItems(reader); err != nil { - log.Errorf("failed to parse bulk response: %v", err.Error()) + client.log.Errorf("failed to parse bulk response: %v", err.Error()) return nil, bulkResultStats{} } @@ -379,9 +365,9 @@ func bulkCollectPublishFails( failed := data[:0] stats := bulkResultStats{} for i := 0; i < count; i++ { - status, msg, err := bulkReadItemStatus(log, reader) + status, msg, err := bulkReadItemStatus(client.log, reader) if err != nil { - log.Error(err) + client.log.Error(err) return nil, bulkResultStats{} } @@ -401,14 +387,35 @@ func bulkCollectPublishFails( if status == http.StatusTooManyRequests { stats.tooMany++ } else { - // hard failure, don't collect - log.Warnf("Cannot index event %#v (status=%v): %s", data[i], status, msg) - stats.nonIndexable++ - continue + // hard failure, apply policy action + result, _ := data[i].Content.Meta.HasKey(dead_letter_marker_field) + if result { + stats.nonIndexable++ + client.log.Errorf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg) + // poison pill - this will clog the pipeline if the underlying failure is non transient. + } else if client.NonIndexableAction == dead_letter_index { + client.log.Warnf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg) + if data[i].Content.Meta == nil { + data[i].Content.Meta = common.MapStr{ + dead_letter_marker_field: true, + } + } else { + data[i].Content.Meta.Put(dead_letter_marker_field, true) + } + data[i].Content.Fields = common.MapStr{ + "message": data[i].Content.Fields.String(), + "error.type": status, + "error.message": string(msg), + } + } else { // drop + stats.nonIndexable++ + client.log.Warnf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg) + continue + } } } - log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) + client.log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) stats.fails++ failed = append(failed, data[i]) } diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 67ccc34b8f7..2ad6a1c79b5 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -200,6 +200,70 @@ func TestClientPublishEventWithPipeline(t *testing.T) { assert.Equal(t, 1, getCount("testfield:0")) // no pipeline } +func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) { + type obj map[string]interface{} + + logp.TestingSetup(logp.WithSelectors("elasticsearch")) + + index := "beat-int-test-dli-index" + deadletterIndex := "beat-int-test-dli-dead-letter-index" + + output, client := connectTestEsWithoutStats(t, obj{ + "index": index, + "non_indexable_policy": map[string]interface{}{ + "dead_letter_index": map[string]interface{}{ + "index": deadletterIndex, + }, + }, + }) + client.conn.Delete(index, "", "", nil) + client.conn.Delete(deadletterIndex, "", "", nil) + + err := output.Publish(context.Background(), outest.NewBatch(beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "type": "libbeat", + "message": "Test message 1", + "testfield": 0, + }, + })) + if err != nil { + t.Fatal(err) + } + + batch := outest.NewBatch(beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "type": "libbeat", + "message": "Test message 2", + "testfield": "foo0", + }, + }) + err = output.Publish(context.Background(), batch) + if err == nil { + t.Fatal("Expecting mapping conflict") + } + _, _, err = client.conn.Refresh(deadletterIndex) + if err == nil { + t.Fatal("expecting index to not exist yet") + } + err = output.Publish(context.Background(), batch) + if err != nil { + t.Fatal(err) + } + + _, _, err = client.conn.Refresh(index) + if err != nil { + t.Fatal(err) + } + + _, _, err = client.conn.Refresh(deadletterIndex) + if err != nil { + t.Fatal(err) + } + +} + func TestClientBulkPublishEventsWithPipeline(t *testing.T) { type obj map[string]interface{} diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index bd28fe5850b..02be8038c40 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -44,6 +44,14 @@ import ( ) func TestCollectPublishFailsNone(t *testing.T) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(t, err) + N := 100 item := `{"create": {"status": 200}},` response := []byte(`{"items": [` + strings.Repeat(item, N) + `]}`) @@ -54,11 +62,19 @@ func TestCollectPublishFailsNone(t *testing.T) { events[i] = publisher.Event{Content: beat.Event{Fields: event}} } - res, _ := bulkCollectPublishFails(logp.L(), response, events) + res, _ := client.bulkCollectPublishFails(response, events) assert.Equal(t, 0, len(res)) } func TestCollectPublishFailMiddle(t *testing.T) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(t, err) + response := []byte(` { "items": [ {"create": {"status": 200}}, @@ -71,15 +87,124 @@ func TestCollectPublishFailMiddle(t *testing.T) { eventFail := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event, eventFail, event} - res, stats := bulkCollectPublishFails(logp.L(), response, events) + res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 1, len(res)) if len(res) == 1 { assert.Equal(t, eventFail, res[0]) } - assert.Equal(t, stats, bulkResultStats{acked: 2, fails: 1, tooMany: 1}) + assert.Equal(t, bulkResultStats{acked: 2, fails: 1, tooMany: 1}, stats) +} + +func TestCollectPublishFailDeadLetterQueue(t *testing.T) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "dead_letter_index", + }, + nil, + ) + assert.NoError(t, err) + + response := []byte(` + { "items": [ + {"create": {"status": 200}}, + {"create": { + "error" : { + "root_cause" : [ + { + "type" : "mapper_parsing_exception", + "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'" + } + ], + "type" : "mapper_parsing_exception", + "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'", + "caused_by" : { + "type" : "illegal_argument_exception", + "reason" : "For input string: \"bar1\"" + } + }, + "status" : 400 + } + }, + {"create": {"status": 200}} + ]} + `) + + event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"bar": 1}}} + eventFail := publisher.Event{Content: beat.Event{Fields: common.MapStr{"bar": "bar1"}}} + events := []publisher.Event{event, eventFail, event} + + res, stats := client.bulkCollectPublishFails(response, events) + assert.Equal(t, 1, len(res)) + if len(res) == 1 { + expected := publisher.Event{ + Content: beat.Event{ + Fields: common.MapStr{ + "message": "{\"bar\":\"bar1\"}", + "error.type": 400, + "error.message": "{\n\t\t\t\"root_cause\" : [\n\t\t\t {\n\t\t\t\t\"type\" : \"mapper_parsing_exception\",\n\t\t\t\t\"reason\" : \"failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'\"\n\t\t\t }\n\t\t\t],\n\t\t\t\"type\" : \"mapper_parsing_exception\",\n\t\t\t\"reason\" : \"failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'\",\n\t\t\t\"caused_by\" : {\n\t\t\t \"type\" : \"illegal_argument_exception\",\n\t\t\t \"reason\" : \"For input string: \\\"bar1\\\"\"\n\t\t\t}\n\t\t }", + }, + Meta: common.MapStr{ + dead_letter_marker_field: true, + }, + }, + } + assert.Equal(t, expected, res[0]) + } + assert.Equal(t, bulkResultStats{acked: 2, fails: 1, nonIndexable: 0}, stats) +} + +func TestCollectPublishFailDrop(t *testing.T) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(t, err) + + response := []byte(` + { "items": [ + {"create": {"status": 200}}, + {"create": { + "error" : { + "root_cause" : [ + { + "type" : "mapper_parsing_exception", + "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'" + } + ], + "type" : "mapper_parsing_exception", + "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'", + "caused_by" : { + "type" : "illegal_argument_exception", + "reason" : "For input string: \"bar1\"" + } + }, + "status" : 400 + } + }, + {"create": {"status": 200}} + ]} + `) + + event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"bar": 1}}} + eventFail := publisher.Event{Content: beat.Event{Fields: common.MapStr{"bar": "bar1"}}} + events := []publisher.Event{event, eventFail, event} + + res, stats := client.bulkCollectPublishFails(response, events) + assert.Equal(t, 0, len(res)) + assert.Equal(t, bulkResultStats{acked: 2, fails: 0, nonIndexable: 1}, stats) } func TestCollectPublishFailAll(t *testing.T) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(t, err) + response := []byte(` { "items": [ {"create": {"status": 429, "error": "ups"}}, @@ -91,7 +216,7 @@ func TestCollectPublishFailAll(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event, event, event} - res, stats := bulkCollectPublishFails(logp.L(), response, events) + res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 3, len(res)) assert.Equal(t, events, res) assert.Equal(t, stats, bulkResultStats{fails: 3, tooMany: 3}) @@ -100,6 +225,14 @@ func TestCollectPublishFailAll(t *testing.T) { func TestCollectPipelinePublishFail(t *testing.T) { logp.TestingSetup(logp.WithSelectors("elasticsearch")) + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(t, err) + response := []byte(`{ "took": 0, "ingest_took": 0, "errors": true, "items": [ @@ -132,12 +265,20 @@ func TestCollectPipelinePublishFail(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 2}}} events := []publisher.Event{event} - res, _ := bulkCollectPublishFails(logp.L(), response, events) + res, _ := client.bulkCollectPublishFails(response, events) assert.Equal(t, 1, len(res)) assert.Equal(t, events, res) } func BenchmarkCollectPublishFailsNone(b *testing.B) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(b, err) + response := []byte(` { "items": [ {"create": {"status": 200}}, @@ -150,7 +291,7 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { events := []publisher.Event{event, event, event} for i := 0; i < b.N; i++ { - res, _ := bulkCollectPublishFails(logp.L(), response, events) + res, _ := client.bulkCollectPublishFails(response, events) if len(res) != 0 { b.Fail() } @@ -158,6 +299,14 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { } func BenchmarkCollectPublishFailMiddle(b *testing.B) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(b, err) + response := []byte(` { "items": [ {"create": {"status": 200}}, @@ -171,7 +320,7 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { events := []publisher.Event{event, eventFail, event} for i := 0; i < b.N; i++ { - res, _ := bulkCollectPublishFails(logp.L(), response, events) + res, _ := client.bulkCollectPublishFails(response, events) if len(res) != 1 { b.Fail() } @@ -179,6 +328,14 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { } func BenchmarkCollectPublishFailAll(b *testing.B) { + client, err := NewClient( + ClientSettings{ + NonIndexableAction: "drop", + }, + nil, + ) + assert.NoError(b, err) + response := []byte(` { "items": [ {"creatMiddlee": {"status": 429, "error": "ups"}}, @@ -191,7 +348,7 @@ func BenchmarkCollectPublishFailAll(b *testing.B) { events := []publisher.Event{event, event, event} for i := 0; i < b.N; i++ { - res, _ := bulkCollectPublishFails(logp.L(), response, events) + res, _ := client.bulkCollectPublishFails(response, events) if len(res) != 3 { b.Fail() } @@ -295,7 +452,16 @@ func TestBulkEncodeEvents(t *testing.T) { } } - encoded, bulkItems := bulkEncodePublishRequest(logp.L(), *common.MustNewVersion(test.version), index, pipeline, events) + client, err := NewClient( + ClientSettings{ + Index: index, + Pipeline: pipeline, + }, + nil, + ) + assert.NoError(t, err) + + encoded, bulkItems := client.bulkEncodePublishRequest(*common.MustNewVersion(test.version), events) assert.Equal(t, len(events), len(encoded), "all events should have been encoded") assert.Equal(t, 2*len(events), len(bulkItems), "incomplete bulk") @@ -361,7 +527,15 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { } } - encoded, bulkItems := bulkEncodePublishRequest(logp.L(), *common.MustNewVersion(version.GetDefaultVersion()), index, pipeline, events) + client, err := NewClient( + ClientSettings{ + Index: index, + Pipeline: pipeline, + }, + nil, + ) + + encoded, bulkItems := client.bulkEncodePublishRequest(*common.MustNewVersion(version.GetDefaultVersion()), events) require.Equal(t, len(events)-1, len(encoded), "all events should have been encoded") require.Equal(t, 9, len(bulkItems), "incomplete bulk") diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index bf2f7932fba..816078f9225 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -21,25 +21,28 @@ import ( "fmt" "time" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" "github.com/elastic/beats/v7/libbeat/common/transport/kerberos" ) type elasticsearchConfig struct { - Protocol string `config:"protocol"` - Path string `config:"path"` - Params map[string]string `config:"parameters"` - Headers map[string]string `config:"headers"` - Username string `config:"username"` - Password string `config:"password"` - APIKey string `config:"api_key"` - LoadBalance bool `config:"loadbalance"` - CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` - EscapeHTML bool `config:"escape_html"` - Kerberos *kerberos.Config `config:"kerberos"` - BulkMaxSize int `config:"bulk_max_size"` - MaxRetries int `config:"max_retries"` - Backoff Backoff `config:"backoff"` + Protocol string `config:"protocol"` + Path string `config:"path"` + Params map[string]string `config:"parameters"` + Headers map[string]string `config:"headers"` + Username string `config:"username"` + Password string `config:"password"` + APIKey string `config:"api_key"` + LoadBalance bool `config:"loadbalance"` + CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` + EscapeHTML bool `config:"escape_html"` + Kerberos *kerberos.Config `config:"kerberos"` + BulkMaxSize int `config:"bulk_max_size"` + MaxRetries int `config:"max_retries"` + Backoff Backoff `config:"backoff"` + NonIndexablePolicy *common.ConfigNamespace `config:"non_indexable_policy"` Transport httpcommon.HTTPTransportSettings `config:",inline"` } diff --git a/libbeat/outputs/elasticsearch/config_test.go b/libbeat/outputs/elasticsearch/config_test.go new file mode 100644 index 00000000000..15d934a1e86 --- /dev/null +++ b/libbeat/outputs/elasticsearch/config_test.go @@ -0,0 +1,106 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestValidDropPolicyConfig(t *testing.T) { + config := ` +non_indexable_policy.drop: ~ +` + c := common.MustNewConfigFrom(config) + elasticsearchOutputConfig, err := readConfig(c) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + policy, err := newNonIndexablePolicy(elasticsearchOutputConfig.NonIndexablePolicy) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + assert.Equal(t, drop, policy.action(), "action should be drop") +} + +func TestDeadLetterIndexPolicyConfig(t *testing.T) { + config := ` +non_indexable_policy.dead_letter_index: + index: "my-dead-letter-index" +` + c := common.MustNewConfigFrom(config) + elasticsearchOutputConfig, err := readConfig(c) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + policy, err := newNonIndexablePolicy(elasticsearchOutputConfig.NonIndexablePolicy) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + assert.Equal(t, "my-dead-letter-index", policy.index(), "index should match config") +} + +func TestInvalidNonIndexablePolicyConfig(t *testing.T) { + tests := map[string]string{ + "non_indexable_policy with invalid policy": ` +non_indexable_policy.juggle: ~ +`, + "dead_Letter_index policy without properties": ` +non_indexable_policy.dead_letter_index: ~ +`, + "dead_Letter_index policy without index": ` +non_indexable_policy.dead_letter_index: + foo: "bar" +`, + "dead_Letter_index policy nil index": ` +non_indexable_policy.dead_letter_index: + index: ~ +`, + "dead_Letter_index policy empty index": ` +non_indexable_policy.dead_letter_index: + index: "" +`, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + c := common.MustNewConfigFrom(test) + elasticsearchOutputConfig, err := readConfig(c) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + _, err = newNonIndexablePolicy(elasticsearchOutputConfig.NonIndexablePolicy) + if err == nil { + t.Fatalf("Can create test configuration from invalid input") + } + t.Logf("error %s", err.Error()) + }) + } +} + +func readConfig(cfg *common.Config) (*elasticsearchConfig, error) { + c := defaultConfig + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + return &c, nil +} diff --git a/libbeat/outputs/elasticsearch/death_letter_selector.go b/libbeat/outputs/elasticsearch/death_letter_selector.go new file mode 100644 index 00000000000..02bd3780cab --- /dev/null +++ b/libbeat/outputs/elasticsearch/death_letter_selector.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch + +import ( + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/outputs" +) + +type DeadLetterSelector struct { + Selector outputs.IndexSelector + DeadLetterIndex string +} + +func (d DeadLetterSelector) Select(event *beat.Event) (string, error) { + result, _ := event.Meta.HasKey(dead_letter_marker_field) + if result { + return d.DeadLetterIndex, nil + } + return d.Selector.Select(event) +} diff --git a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc index 80b2bb36879..ebe22854bee 100644 --- a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc +++ b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc @@ -682,3 +682,38 @@ or <> for more information. Configuration options for Kerberos authentication. See <> for more information. + +===== `non_indexable_policy` + +Specifies the behavior when the elasticsearch cluster explicitly rejects documents, for example on mapping conflicts. + +====== `drop` +The default behaviour, when an event is explicitly rejected by elasticsearch it is dropped. + +["source","yaml"] +------------------------------------------------------------------------------ +output.elasticsearch: + hosts: ["http://localhost:9200"] + non_indexable_policy.drop: ~ +------------------------------------------------------------------------------ + +====== `dead_letter_index` + +beta[] + +On an explicit rejection, this policy will retry the event in the next batch. However, the target index will change +to index specified. In addition, the structure of the event will be change to the following fields: + +message:: Contains the escaped json of the original event. +error.type:: Contains the status code +error.message:: Contains status returned by elasticsearch, describing the reason + +`index`:: The index to send rejected events to. + +["source","yaml"] +------------------------------------------------------------------------------ +output.elasticsearch: + hosts: ["http://localhost:9200"] + non_indexable_policy.dead_letter_index: + index: "my-dead-letter-index" +------------------------------------------------------------------------------ diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 682d0e5a41e..417043eff83 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -53,6 +53,12 @@ func makeES( return outputs.Fail(err) } + policy, err := newNonIndexablePolicy(config.NonIndexablePolicy) + if err != nil { + log.Errorf("error while creating file identifier: %v", err) + return outputs.Fail(err) + } + hosts, err := outputs.ReadHostList(cfg) if err != nil { return outputs.Fail(err) @@ -67,6 +73,13 @@ func makeES( params = nil } + if policy.action() == dead_letter_index { + index = DeadLetterSelector{ + Selector: index, + DeadLetterIndex: policy.index(), + } + } + clients := make([]outputs.NetworkClient, len(hosts)) for i, host := range hosts { esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) @@ -90,9 +103,10 @@ func makeES( EscapeHTML: config.EscapeHTML, Transport: config.Transport, }, - Index: index, - Pipeline: pipeline, - Observer: observer, + Index: index, + Pipeline: pipeline, + Observer: observer, + NonIndexableAction: policy.action(), }, &connectCallbackRegistry) if err != nil { return outputs.Fail(err) diff --git a/libbeat/outputs/elasticsearch/elasticsearch_test.go b/libbeat/outputs/elasticsearch/elasticsearch_test.go index df757d570dd..f84d32c2ccd 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch_test.go +++ b/libbeat/outputs/elasticsearch/elasticsearch_test.go @@ -21,6 +21,8 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" @@ -116,11 +118,20 @@ func TestPipelineSelection(t *testing.T) { for name, test := range cases { t.Run(name, func(t *testing.T) { selector, err := buildPipelineSelector(common.MustNewConfigFrom(test.cfg)) + + client, err := NewClient( + ClientSettings{ + Pipeline: &selector, + }, + nil, + ) + assert.NoError(t, err) + if err != nil { t.Fatalf("Failed to parse configuration: %v", err) } - got, err := getPipeline(&test.event, &selector) + got, err := client.getPipeline(&test.event) if err != nil { t.Fatalf("Failed to create pipeline name: %v", err) } diff --git a/libbeat/outputs/elasticsearch/non_indexable_policy.go b/libbeat/outputs/elasticsearch/non_indexable_policy.go new file mode 100644 index 00000000000..8602fdb1245 --- /dev/null +++ b/libbeat/outputs/elasticsearch/non_indexable_policy.go @@ -0,0 +1,104 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + + "github.com/elastic/beats/v7/libbeat/common" +) + +const ( + dead_letter_marker_field = "deadlettered" + drop = "drop" + dead_letter_index = "dead_letter_index" +) + +type DropPolicy struct{} + +func (d DropPolicy) action() string { + return drop +} + +func (d DropPolicy) index() string { + panic("drop policy doesn't have an target index") +} + +type DeadLetterIndexPolicy struct { + Index string +} + +func (d DeadLetterIndexPolicy) action() string { + return dead_letter_index +} + +func (d DeadLetterIndexPolicy) index() string { + return d.Index +} + +type nonIndexablePolicy interface { + action() string + index() string +} + +var ( + policyFactories = map[string]policyFactory{ + drop: newDropPolicy, + dead_letter_index: newDeadLetterIndexPolicy, + } +) + +func newDeadLetterIndexPolicy(config *common.Config) (nonIndexablePolicy, error) { + cfgwarn.Beta("The non_indexable_policy dead_letter_index is beta.") + policy := DeadLetterIndexPolicy{} + err := config.Unpack(&policy) + if policy.index() == "" { + return nil, fmt.Errorf("%s policy requires an `index` to be specified specified", dead_letter_index) + } + return policy, err +} + +func newDropPolicy(*common.Config) (nonIndexablePolicy, error) { + return defaultDropPolicy(), nil +} + +func defaultPolicy() nonIndexablePolicy { + return defaultDropPolicy() +} + +func defaultDropPolicy() nonIndexablePolicy { + return &DropPolicy{} +} + +type policyFactory func(config *common.Config) (nonIndexablePolicy, error) + +func newNonIndexablePolicy(configNamespace *common.ConfigNamespace) (nonIndexablePolicy, error) { + if configNamespace == nil { + return defaultPolicy(), nil + } + + policyType := configNamespace.Name() + factory, ok := policyFactories[policyType] + if !ok { + return nil, fmt.Errorf("no such policy type: %s", policyType) + } + + return factory(configNamespace.Config()) +}