Skip to content

Commit

Permalink
Drop event batch when get HTTP status 413 from ES (#29368)
Browse files Browse the repository at this point in the history
To prevent infinite loops when having `http.max_content_length` set
too low or `bulk_max_size` too high we now handle this status code
separately and drop the whole event batch producing a detailed error
message on the console.
  • Loading branch information
rdner authored Dec 16, 2021
1 parent b5e9414 commit d401681
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Remove `event.dataset` (ECS) annotion from `libbeat.logp`. {issue}27404[27404]
- Errors should be thrown as errors. Metricsets inside Metricbeat will now throw errors as the `error` log level. {pull}27804[27804]
- Avoid panicking in `add_fields` processor when input event.Fields is a nil map. {pull}28219[28219]
- Drop event batch when get HTTP status 413 from Elasticsearch to avoid infinite loop {issue}14350[14350] {pull}29368[29368]

==== Added

Expand Down
14 changes: 12 additions & 2 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"github.com/elastic/beats/v7/libbeat/testing"
)

var errPayloadTooLarge = errors.New("the bulk payload is too large for the server. Consider to adjust `http.max_content_length` parameter in Elasticsearch or `bulk_max_size` in the beat. The batch has been dropped")

// Client is an elasticsearch client.
type Client struct {
conn eslegclient.Connection
Expand Down Expand Up @@ -180,9 +182,13 @@ func (client *Client) Clone() *Client {
func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error {
events := batch.Events()
rest, err := client.publishEvents(ctx, events)
if len(rest) == 0 {

switch {
case err == errPayloadTooLarge:
batch.Drop()
case len(rest) == 0:
batch.ACK()
} else {
default:
batch.RetryEvents(rest)
}
return err
Expand Down Expand Up @@ -220,7 +226,11 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
}

status, result, sendErr := client.conn.Bulk(ctx, "", "", nil, bulkItems)

if sendErr != nil {
if status == http.StatusRequestEntityTooLarge {
sendErr = errPayloadTooLarge
}
err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", sendErr))
err.Send()
client.log.Error(err)
Expand Down
97 changes: 97 additions & 0 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,103 @@ import (
"github.com/elastic/beats/v7/libbeat/version"
)

type testIndexSelector struct{}

func (testIndexSelector) Select(event *beat.Event) (string, error) {
return "test", nil
}

type batchMock struct {
// we embed the interface so we are able to implement the interface partially,
// only functions needed for tests are implemented
// if you use a function that is not implemented in the mock it will panic
publisher.Batch
events []publisher.Event
ack bool
drop bool
retryEvents []publisher.Event
}

func (bm batchMock) Events() []publisher.Event {
return bm.events
}
func (bm *batchMock) ACK() {
bm.ack = true
}
func (bm *batchMock) Drop() {
bm.drop = true
}
func (bm *batchMock) RetryEvents(events []publisher.Event) {
bm.retryEvents = events
}

func TestPublishStatusCode(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 1}}}
events := []publisher.Event{event}

t.Run("returns pre-defined error and drops batch when 413", func(t *testing.T) {
esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusRequestEntityTooLarge)
w.Write([]byte("Request failed to get to the server (status code: 413)")) // actual response from ES
}))
defer esMock.Close()

client, err := NewClient(
ClientSettings{
ConnectionSettings: eslegclient.ConnectionSettings{
URL: esMock.URL,
},
Index: testIndexSelector{},
},
nil,
)
assert.NoError(t, err)

event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 1}}}
events := []publisher.Event{event}
batch := &batchMock{
events: events,
}

err = client.Publish(ctx, batch)

assert.Error(t, err)
assert.Equal(t, errPayloadTooLarge, err, "should be a pre-defined error")
assert.True(t, batch.drop, "should must be dropped")
})

t.Run("retries the batch if bad HTTP status", func(t *testing.T) {
esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer esMock.Close()

client, err := NewClient(
ClientSettings{
ConnectionSettings: eslegclient.ConnectionSettings{
URL: esMock.URL,
},
Index: testIndexSelector{},
},
nil,
)
assert.NoError(t, err)

batch := &batchMock{
events: events,
}

err = client.Publish(ctx, batch)

assert.Error(t, err)
assert.False(t, batch.ack, "should not be acknowledged")
assert.Len(t, batch.retryEvents, len(events), "all events should be in retry")
})
}

func TestCollectPublishFailsNone(t *testing.T) {
client, err := NewClient(
ClientSettings{
Expand Down

0 comments on commit d401681

Please sign in to comment.