Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding functionality to death letter index instead of dropping events. #26952

Merged
merged 17 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Ensure common proxy settings support in HTTP clients: proxy_disabled, proxy_url, proxy_headers and typical environment variables HTTP_PROXY, HTTPS_PROXY, NOPROXY. {pull}25219[25219]
- `add_process_metadata` processor enrich process information with owner name and id. {issue}21068[21068] {pull}21111[21111]
- Add proxy support for AWS functions. {pull}26832[26832]
- Added policies to the elasticsearch output for non indexible events {pull}26952[26952]
- Add sha256 digests to RPM packages. {issue}23670[23670]

*Auditbeat*
Expand Down
101 changes: 54 additions & 47 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,26 @@ type Client struct {
index outputs.IndexSelector
pipeline *outil.Selector

observer outputs.Observer
observer outputs.Observer
NonIndexableAction string

log *logp.Logger
}

// ClientSettings contains the settings for a client.
type ClientSettings struct {
eslegclient.ConnectionSettings
Index outputs.IndexSelector
Pipeline *outil.Selector
Observer outputs.Observer
Index outputs.IndexSelector
Pipeline *outil.Selector
Observer outputs.Observer
NonIndexableAction string
}

type bulkResultStats struct {
acked int // number of events ACKed by Elasticsearch
duplicates int // number of events failed with `create` due to ID already being indexed
fails int // number of failed events (can be retried)
nonIndexable int // number of failed events (not indexable -> must be dropped)
mjmbischoff marked this conversation as resolved.
Show resolved Hide resolved
nonIndexable int // number of failed events (not indexable)
tooMany int // number of events receiving HTTP 429 Too Many Requests
}

Expand Down Expand Up @@ -123,11 +125,11 @@ func NewClient(
}

client := &Client{
conn: *conn,
index: s.Index,
pipeline: pipeline,

observer: s.Observer,
conn: *conn,
index: s.Index,
pipeline: pipeline,
observer: s.Observer,
NonIndexableAction: s.NonIndexableAction,

log: logp.NewLogger("elasticsearch"),
}
Expand Down Expand Up @@ -166,6 +168,7 @@ func (client *Client) Clone() *Client {
ConnectionSettings: connection,
Index: client.index,
Pipeline: client.pipeline,
NonIndexableAction: client.NonIndexableAction,
},
nil, // XXX: do not pass connection callback?
)
Expand Down Expand Up @@ -204,7 +207,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
// events slice
origCount := len(data)
span.Context.SetLabel("events_original", origCount)
data, bulkItems := bulkEncodePublishRequest(client.log, client.conn.GetVersion(), client.index, client.pipeline, data)
data, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), data)
newCount := len(data)
span.Context.SetLabel("events_encoded", newCount)
if st != nil && origCount > newCount {
Expand Down Expand Up @@ -235,7 +238,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
failedEvents = data
stats.fails = len(failedEvents)
} else {
failedEvents, stats = bulkCollectPublishFails(client.log, result, data)
failedEvents, stats = client.bulkCollectPublishFails(result, data)
}

failed := len(failedEvents)
Expand Down Expand Up @@ -263,21 +266,14 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)

// bulkEncodePublishRequest encodes all bulk requests and returns slice of events
// successfully added to the list of bulk items and the list of bulk items.
func bulkEncodePublishRequest(
log *logp.Logger,
version common.Version,
index outputs.IndexSelector,
pipeline *outil.Selector,
data []publisher.Event,
) ([]publisher.Event, []interface{}) {

func (client *Client) bulkEncodePublishRequest(version common.Version, data []publisher.Event) ([]publisher.Event, []interface{}) {
okEvents := data[:0]
bulkItems := []interface{}{}
for i := range data {
event := &data[i].Content
meta, err := createEventBulkMeta(log, version, index, pipeline, event)
meta, err := client.createEventBulkMeta(version, event)
if err != nil {
log.Errorf("Failed to encode event meta data: %+v", err)
client.log.Errorf("Failed to encode event meta data: %+v", err)
continue
}
if opType := events.GetOpType(*event); opType == events.OpTypeDelete {
Expand All @@ -291,25 +287,19 @@ func bulkEncodePublishRequest(
return okEvents, bulkItems
}

func createEventBulkMeta(
log *logp.Logger,
version common.Version,
indexSel outputs.IndexSelector,
pipelineSel *outil.Selector,
event *beat.Event,
) (interface{}, error) {
func (client *Client) createEventBulkMeta(version common.Version, event *beat.Event) (interface{}, error) {
eventType := ""
if version.Major < 7 {
eventType = defaultEventType
}

pipeline, err := getPipeline(event, pipelineSel)
pipeline, err := client.getPipeline(event)
if err != nil {
err := fmt.Errorf("failed to select pipeline: %v", err)
return nil, err
}

index, err := indexSel.Select(event)
index, err := client.index.Select(event)
if err != nil {
err := fmt.Errorf("failed to select event index: %v", err)
return nil, err
Expand Down Expand Up @@ -341,7 +331,7 @@ func createEventBulkMeta(
return eslegclient.BulkIndexAction{Index: meta}, nil
}

func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) {
func (client *Client) getPipeline(event *beat.Event) (string, error) {
if event.Meta != nil {
pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline)
if err == common.ErrKeyNotFound {
Expand All @@ -354,8 +344,8 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
return strings.ToLower(pipeline), nil
}

if pipelineSel != nil {
return pipelineSel.Select(event)
if client.pipeline != nil {
return client.pipeline.Select(event)
}
return "", nil
}
Expand All @@ -364,24 +354,20 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
// to be tried again due to error code returned for that items. If indexing an
// event failed due to some error in the event itself (e.g. does not respect mapping),
// the event will be dropped.
func bulkCollectPublishFails(
log *logp.Logger,
result eslegclient.BulkResult,
data []publisher.Event,
) ([]publisher.Event, bulkResultStats) {
func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, data []publisher.Event) ([]publisher.Event, bulkResultStats) {
reader := newJSONReader(result)
if err := bulkReadToItems(reader); err != nil {
log.Errorf("failed to parse bulk response: %v", err.Error())
client.log.Errorf("failed to parse bulk response: %v", err.Error())
return nil, bulkResultStats{}
}

count := len(data)
failed := data[:0]
stats := bulkResultStats{}
for i := 0; i < count; i++ {
status, msg, err := bulkReadItemStatus(log, reader)
status, msg, err := bulkReadItemStatus(client.log, reader)
if err != nil {
log.Error(err)
client.log.Error(err)
return nil, bulkResultStats{}
}

Expand All @@ -401,14 +387,35 @@ func bulkCollectPublishFails(
if status == http.StatusTooManyRequests {
stats.tooMany++
} else {
// hard failure, don't collect
log.Warnf("Cannot index event %#v (status=%v): %s", data[i], status, msg)
stats.nonIndexable++
continue
// hard failure, apply policy action
result, _ := data[i].Content.Meta.HasKey(dead_letter_marker_field)
if result {
stats.nonIndexable++
client.log.Errorf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg)
// poison pill - this will clog the pipeline if the underlying failure is non transient.
} else if client.NonIndexableAction == dead_letter_index {
client.log.Warnf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg)
if data[i].Content.Meta == nil {
data[i].Content.Meta = common.MapStr{
dead_letter_marker_field: true,
}
} else {
data[i].Content.Meta.Put(dead_letter_marker_field, true)
}
data[i].Content.Fields = common.MapStr{
"message": data[i].Content.Fields.String(),
"error.type": status,
"error.message": string(msg),
}
} else { // drop
stats.nonIndexable++
client.log.Warnf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg)
continue
}
}
}

log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
client.log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
stats.fails++
failed = append(failed, data[i])
}
Expand Down
64 changes: 64 additions & 0 deletions libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,70 @@ func TestClientPublishEventWithPipeline(t *testing.T) {
assert.Equal(t, 1, getCount("testfield:0")) // no pipeline
}

func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) {
type obj map[string]interface{}

logp.TestingSetup(logp.WithSelectors("elasticsearch"))

index := "beat-int-test-dli-index"
deadletterIndex := "beat-int-test-dli-dead-letter-index"

output, client := connectTestEsWithoutStats(t, obj{
"index": index,
"non_indexable_policy": map[string]interface{}{
"dead_letter_index": map[string]interface{}{
"index": deadletterIndex,
},
},
})
client.conn.Delete(index, "", "", nil)
client.conn.Delete(deadletterIndex, "", "", nil)

err := output.Publish(context.Background(), outest.NewBatch(beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"type": "libbeat",
"message": "Test message 1",
"testfield": 0,
},
}))
if err != nil {
t.Fatal(err)
}

batch := outest.NewBatch(beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"type": "libbeat",
"message": "Test message 2",
"testfield": "foo0",
},
})
err = output.Publish(context.Background(), batch)
if err == nil {
t.Fatal("Expecting mapping conflict")
}
_, _, err = client.conn.Refresh(deadletterIndex)
if err == nil {
t.Fatal("expecting index to not exist yet")
}
err = output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
}

_, _, err = client.conn.Refresh(index)
if err != nil {
t.Fatal(err)
}

_, _, err = client.conn.Refresh(deadletterIndex)
if err != nil {
t.Fatal(err)
}

}

func TestClientBulkPublishEventsWithPipeline(t *testing.T) {
type obj map[string]interface{}

Expand Down
Loading