Skip to content

Commit

Permalink
Non indexable policy: death letter index (#26952)
Browse files Browse the repository at this point in the history
Adding functionality to death letter index instead of dropping events.

When there's a mapping conflict events are dropped, this commits allows one attempting to index it into a different index instead so it can be re-ingested later vs dropping or stalling ingest. The index currently must reside on the same ES cluster.

* buildCollectPublishFails and other functions changed to be a function of Client so not to have to pass arguments

Co-authored-by: Noémi Ványi <kvch@users.noreply.github.com>
(cherry picked from commit 60cf09d)
  • Loading branch information
mjmbischoff authored and mergify-bot committed Aug 2, 2021
1 parent 8500e47 commit d18dc64
Show file tree
Hide file tree
Showing 11 changed files with 631 additions and 75 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Libbeat: report beat version to monitoring. {pull}26214[26214]
- Ensure common proxy settings support in HTTP clients: proxy_disabled, proxy_url, proxy_headers and typical environment variables HTTP_PROXY, HTTPS_PROXY, NOPROXY. {pull}25219[25219]
- Add proxy support for AWS functions. {pull}26832[26832]
- Added policies to the elasticsearch output for non indexible events {pull}26952[26952]
- Add sha256 digests to RPM packages. {issue}23670[23670]

*Auditbeat*

Expand Down
101 changes: 54 additions & 47 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,26 @@ type Client struct {
index outputs.IndexSelector
pipeline *outil.Selector

observer outputs.Observer
observer outputs.Observer
NonIndexableAction string

log *logp.Logger
}

// ClientSettings contains the settings for a client.
type ClientSettings struct {
eslegclient.ConnectionSettings
Index outputs.IndexSelector
Pipeline *outil.Selector
Observer outputs.Observer
Index outputs.IndexSelector
Pipeline *outil.Selector
Observer outputs.Observer
NonIndexableAction string
}

type bulkResultStats struct {
acked int // number of events ACKed by Elasticsearch
duplicates int // number of events failed with `create` due to ID already being indexed
fails int // number of failed events (can be retried)
nonIndexable int // number of failed events (not indexable -> must be dropped)
nonIndexable int // number of failed events (not indexable)
tooMany int // number of events receiving HTTP 429 Too Many Requests
}

Expand Down Expand Up @@ -123,11 +125,11 @@ func NewClient(
}

client := &Client{
conn: *conn,
index: s.Index,
pipeline: pipeline,

observer: s.Observer,
conn: *conn,
index: s.Index,
pipeline: pipeline,
observer: s.Observer,
NonIndexableAction: s.NonIndexableAction,

log: logp.NewLogger("elasticsearch"),
}
Expand Down Expand Up @@ -166,6 +168,7 @@ func (client *Client) Clone() *Client {
ConnectionSettings: connection,
Index: client.index,
Pipeline: client.pipeline,
NonIndexableAction: client.NonIndexableAction,
},
nil, // XXX: do not pass connection callback?
)
Expand Down Expand Up @@ -204,7 +207,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
// events slice
origCount := len(data)
span.Context.SetLabel("events_original", origCount)
data, bulkItems := bulkEncodePublishRequest(client.log, client.conn.GetVersion(), client.index, client.pipeline, data)
data, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), data)
newCount := len(data)
span.Context.SetLabel("events_encoded", newCount)
if st != nil && origCount > newCount {
Expand Down Expand Up @@ -235,7 +238,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
failedEvents = data
stats.fails = len(failedEvents)
} else {
failedEvents, stats = bulkCollectPublishFails(client.log, result, data)
failedEvents, stats = client.bulkCollectPublishFails(result, data)
}

failed := len(failedEvents)
Expand Down Expand Up @@ -263,21 +266,14 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)

// bulkEncodePublishRequest encodes all bulk requests and returns slice of events
// successfully added to the list of bulk items and the list of bulk items.
func bulkEncodePublishRequest(
log *logp.Logger,
version common.Version,
index outputs.IndexSelector,
pipeline *outil.Selector,
data []publisher.Event,
) ([]publisher.Event, []interface{}) {

func (client *Client) bulkEncodePublishRequest(version common.Version, data []publisher.Event) ([]publisher.Event, []interface{}) {
okEvents := data[:0]
bulkItems := []interface{}{}
for i := range data {
event := &data[i].Content
meta, err := createEventBulkMeta(log, version, index, pipeline, event)
meta, err := client.createEventBulkMeta(version, event)
if err != nil {
log.Errorf("Failed to encode event meta data: %+v", err)
client.log.Errorf("Failed to encode event meta data: %+v", err)
continue
}
if opType := events.GetOpType(*event); opType == events.OpTypeDelete {
Expand All @@ -291,25 +287,19 @@ func bulkEncodePublishRequest(
return okEvents, bulkItems
}

func createEventBulkMeta(
log *logp.Logger,
version common.Version,
indexSel outputs.IndexSelector,
pipelineSel *outil.Selector,
event *beat.Event,
) (interface{}, error) {
func (client *Client) createEventBulkMeta(version common.Version, event *beat.Event) (interface{}, error) {
eventType := ""
if version.Major < 7 {
eventType = defaultEventType
}

pipeline, err := getPipeline(event, pipelineSel)
pipeline, err := client.getPipeline(event)
if err != nil {
err := fmt.Errorf("failed to select pipeline: %v", err)
return nil, err
}

index, err := indexSel.Select(event)
index, err := client.index.Select(event)
if err != nil {
err := fmt.Errorf("failed to select event index: %v", err)
return nil, err
Expand Down Expand Up @@ -341,7 +331,7 @@ func createEventBulkMeta(
return eslegclient.BulkIndexAction{Index: meta}, nil
}

func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) {
func (client *Client) getPipeline(event *beat.Event) (string, error) {
if event.Meta != nil {
pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline)
if err == common.ErrKeyNotFound {
Expand All @@ -354,8 +344,8 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
return strings.ToLower(pipeline), nil
}

if pipelineSel != nil {
return pipelineSel.Select(event)
if client.pipeline != nil {
return client.pipeline.Select(event)
}
return "", nil
}
Expand All @@ -364,24 +354,20 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
// to be tried again due to error code returned for that items. If indexing an
// event failed due to some error in the event itself (e.g. does not respect mapping),
// the event will be dropped.
func bulkCollectPublishFails(
log *logp.Logger,
result eslegclient.BulkResult,
data []publisher.Event,
) ([]publisher.Event, bulkResultStats) {
func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, data []publisher.Event) ([]publisher.Event, bulkResultStats) {
reader := newJSONReader(result)
if err := bulkReadToItems(reader); err != nil {
log.Errorf("failed to parse bulk response: %v", err.Error())
client.log.Errorf("failed to parse bulk response: %v", err.Error())
return nil, bulkResultStats{}
}

count := len(data)
failed := data[:0]
stats := bulkResultStats{}
for i := 0; i < count; i++ {
status, msg, err := bulkReadItemStatus(log, reader)
status, msg, err := bulkReadItemStatus(client.log, reader)
if err != nil {
log.Error(err)
client.log.Error(err)
return nil, bulkResultStats{}
}

Expand All @@ -401,14 +387,35 @@ func bulkCollectPublishFails(
if status == http.StatusTooManyRequests {
stats.tooMany++
} else {
// hard failure, don't collect
log.Warnf("Cannot index event %#v (status=%v): %s", data[i], status, msg)
stats.nonIndexable++
continue
// hard failure, apply policy action
result, _ := data[i].Content.Meta.HasKey(dead_letter_marker_field)
if result {
stats.nonIndexable++
client.log.Errorf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg)
// poison pill - this will clog the pipeline if the underlying failure is non transient.
} else if client.NonIndexableAction == dead_letter_index {
client.log.Warnf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg)
if data[i].Content.Meta == nil {
data[i].Content.Meta = common.MapStr{
dead_letter_marker_field: true,
}
} else {
data[i].Content.Meta.Put(dead_letter_marker_field, true)
}
data[i].Content.Fields = common.MapStr{
"message": data[i].Content.Fields.String(),
"error.type": status,
"error.message": string(msg),
}
} else { // drop
stats.nonIndexable++
client.log.Warnf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg)
continue
}
}
}

log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
client.log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
stats.fails++
failed = append(failed, data[i])
}
Expand Down
64 changes: 64 additions & 0 deletions libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,70 @@ func TestClientPublishEventWithPipeline(t *testing.T) {
assert.Equal(t, 1, getCount("testfield:0")) // no pipeline
}

func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) {
type obj map[string]interface{}

logp.TestingSetup(logp.WithSelectors("elasticsearch"))

index := "beat-int-test-dli-index"
deadletterIndex := "beat-int-test-dli-dead-letter-index"

output, client := connectTestEsWithoutStats(t, obj{
"index": index,
"non_indexable_policy": map[string]interface{}{
"dead_letter_index": map[string]interface{}{
"index": deadletterIndex,
},
},
})
client.conn.Delete(index, "", "", nil)
client.conn.Delete(deadletterIndex, "", "", nil)

err := output.Publish(context.Background(), outest.NewBatch(beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"type": "libbeat",
"message": "Test message 1",
"testfield": 0,
},
}))
if err != nil {
t.Fatal(err)
}

batch := outest.NewBatch(beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"type": "libbeat",
"message": "Test message 2",
"testfield": "foo0",
},
})
err = output.Publish(context.Background(), batch)
if err == nil {
t.Fatal("Expecting mapping conflict")
}
_, _, err = client.conn.Refresh(deadletterIndex)
if err == nil {
t.Fatal("expecting index to not exist yet")
}
err = output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
}

_, _, err = client.conn.Refresh(index)
if err != nil {
t.Fatal(err)
}

_, _, err = client.conn.Refresh(deadletterIndex)
if err != nil {
t.Fatal(err)
}

}

func TestClientBulkPublishEventsWithPipeline(t *testing.T) {
type obj map[string]interface{}

Expand Down
Loading

0 comments on commit d18dc64

Please sign in to comment.