Skip to content

Commit

Permalink
Log monitoring bulk failures (#14356)
Browse files Browse the repository at this point in the history
* Log monitoring bulk failures

* Renaming function

* Simplifying type

* Removing extraneous second value

* Adding godoc comments

* Adding CHANGELOG entry

* Clarifying log messages

* WIP: adding unit test stubs

* Fleshing out unit tests
  • Loading branch information
ycombinator authored Nov 14, 2019
1 parent 1eb90bd commit a9aff6f
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 110 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update azure configuration example. {issue}14224[14224]
- Fix cloudwatch metricset with names and dimensions in config. {issue}14376[14376] {pull}14391[14391]
- Fix marshaling of ms-since-epoch values in `elasticsearch/cluster_stats` metricset. {pull}14378[14378]
- Log bulk failures from bulk API requests to monitoring cluster. {issue}14303[14303] {pull}14356[14356]

*Packetbeat*

Expand Down
31 changes: 30 additions & 1 deletion libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package elasticsearch
import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -229,7 +230,12 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error {
// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
// FIXME: index name (first param below)
_, err = c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:])
result, err := c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:])
if err != nil {
return err
}

logBulkFailures(result, []report.Event{document})
return err
}

Expand All @@ -238,3 +244,26 @@ func getMonitoringIndexName() string {
date := time.Now().Format("2006.01.02")
return fmt.Sprintf(".monitoring-beats-%v-%s", version, date)
}

func logBulkFailures(result esout.BulkResult, events []report.Event) {
reader := esout.NewJSONReader(result)
err := esout.BulkReadToItems(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk items: %v", err)
return
}

for i := range events {
status, msg, err := esout.BulkReadItemStatus(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk item status: %v", err)
return
}
switch {
case status < 300, status == http.StatusConflict:
continue
default:
logp.Warn("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
}
}
}
29 changes: 10 additions & 19 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package elasticsearch

import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
Expand All @@ -34,16 +35,15 @@ type bulkRequest struct {
requ *http.Request
}

type bulkResult struct {
raw []byte
}
// BulkResult contains the result of a bulk API request.
type BulkResult json.RawMessage

// Bulk performs many index/delete operations in a single API call.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
func (conn *Connection) Bulk(
index, docType string,
params map[string]string, body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
return conn.BulkWith(index, docType, params, nil, body)
}

Expand All @@ -56,7 +56,7 @@ func (conn *Connection) BulkWith(
params map[string]string,
metaBuilder MetaBuilder,
body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
if len(body) == 0 {
return nil, nil
}
Expand All @@ -76,7 +76,7 @@ func (conn *Connection) BulkWith(
if err != nil {
return nil, err
}
return readQueryResult(result.raw)
return result, nil
}

// SendMonitoringBulk creates a HTTP request to the X-Pack Monitoring API containing a bunch of
Expand All @@ -85,7 +85,7 @@ func (conn *Connection) BulkWith(
func (conn *Connection) SendMonitoringBulk(
params map[string]string,
body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
if len(body) == 0 {
return nil, nil
}
Expand All @@ -111,7 +111,7 @@ func (conn *Connection) SendMonitoringBulk(
if err != nil {
return nil, err
}
return readQueryResult(result.raw)
return result, nil
}

func newBulkRequest(
Expand Down Expand Up @@ -199,18 +199,9 @@ func (r *bulkRequest) Reset(body bodyEncoder) {
body.AddHeader(&r.requ.Header)
}

func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, bulkResult, error) {
func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResult, error) {
status, resp, err := conn.execHTTPRequest(requ.requ)
if err != nil {
return status, bulkResult{}, err
}

result, err := readBulkResult(resp)
return status, result, err
}

func readBulkResult(obj []byte) (bulkResult, error) {
return bulkResult{obj}, nil
return status, BulkResult(resp), err
}

func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error {
Expand Down
8 changes: 2 additions & 6 deletions libbeat/outputs/elasticsearch/bulkapi_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"net/http"
"os"
Expand All @@ -34,7 +33,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("elasticsearch"))

index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())
expectedResp, _ := json.Marshal(QueryResult{Ok: true, Index: index, Type: "type1", ID: "1", Version: 1, Created: true})
expectedResp := []byte(`{"took":7,"errors":false,"items":[]}`)

ops := []map[string]interface{}{
{
Expand All @@ -61,13 +60,10 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {
params := map[string]string{
"refresh": "true",
}
resp, err := client.Bulk(index, "type1", params, body)
_, err := client.Bulk(index, "type1", params, body)
if err != nil {
t.Errorf("Bulk() returns error: %s", err)
}
if !resp.Created {
t.Errorf("Bulk() fails: %s", resp)
}
}

func TestOneHost500Resp_Bulk(t *testing.T) {
Expand Down
82 changes: 45 additions & 37 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Client struct {
bulkRequ *bulkRequest

// buffered json response reader
json jsonReader
json JSONReader

// additional configs
compressionLevel int
Expand Down Expand Up @@ -128,6 +128,7 @@ var (
)

var (
errExpectedItemsArray = errors.New("expected items array")
errExpectedItemObject = errors.New("expected item response object")
errExpectedStatusCode = errors.New("expected item status code")
errUnexpectedEmptyObject = errors.New("empty object")
Expand Down Expand Up @@ -360,7 +361,7 @@ func (client *Client) publishEvents(
failedEvents = data
stats.fails = len(failedEvents)
} else {
client.json.init(result.raw)
client.json.init(result)
failedEvents, stats = bulkCollectPublishFails(&client.json, data)
}

Expand Down Expand Up @@ -478,46 +479,19 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
// event failed due to some error in the event itself (e.g. does not respect mapping),
// the event will be dropped.
func bulkCollectPublishFails(
reader *jsonReader,
reader *JSONReader,
data []publisher.Event,
) ([]publisher.Event, bulkResultStats) {
if err := reader.expectDict(); err != nil {
logp.Err("Failed to parse bulk response: expected JSON object")
return nil, bulkResultStats{}
}

// find 'items' field in response
for {
kind, name, err := reader.nextFieldName()
if err != nil {
logp.Err("Failed to parse bulk response")
return nil, bulkResultStats{}
}

if kind == dictEnd {
logp.Err("Failed to parse bulk response: no 'items' field in response")
return nil, bulkResultStats{}
}

// found items array -> continue
if bytes.Equal(name, nameItems) {
break
}

reader.ignoreNext()
}

// check items field is an array
if err := reader.expectArray(); err != nil {
logp.Err("Failed to parse bulk response: expected items array")
if err := BulkReadToItems(reader); err != nil {
logp.Err("failed to parse bulk response: %v", err.Error())
return nil, bulkResultStats{}
}

count := len(data)
failed := data[:0]
stats := bulkResultStats{}
for i := 0; i < count; i++ {
status, msg, err := itemStatus(reader)
status, msg, err := BulkReadItemStatus(reader)
if err != nil {
return nil, bulkResultStats{}
}
Expand Down Expand Up @@ -553,9 +527,43 @@ func bulkCollectPublishFails(
return failed, stats
}

func itemStatus(reader *jsonReader) (int, []byte, error) {
// BulkReadToItems reads the bulk response up to (but not including) items
func BulkReadToItems(reader *JSONReader) error {
if err := reader.ExpectDict(); err != nil {
return errExpectedObject
}

// find 'items' field in response
for {
kind, name, err := reader.nextFieldName()
if err != nil {
return err
}

if kind == dictEnd {
return errExpectedItemsArray
}

// found items array -> continue
if bytes.Equal(name, nameItems) {
break
}

reader.ignoreNext()
}

// check items field is an array
if err := reader.ExpectArray(); err != nil {
return errExpectedItemsArray
}

return nil
}

// BulkReadItemStatus reads the status and error fields from the bulk item
func BulkReadItemStatus(reader *JSONReader) (int, []byte, error) {
// skip outer dictionary
if err := reader.expectDict(); err != nil {
if err := reader.ExpectDict(); err != nil {
return 0, nil, errExpectedItemObject
}

Expand Down Expand Up @@ -593,8 +601,8 @@ func itemStatus(reader *jsonReader) (int, []byte, error) {
return status, msg, nil
}

func itemStatusInner(reader *jsonReader) (int, []byte, error) {
if err := reader.expectDict(); err != nil {
func itemStatusInner(reader *JSONReader) (int, []byte, error) {
if err := reader.ExpectDict(); err != nil {
return 0, nil, errExpectedItemObject
}

Expand Down
Loading

0 comments on commit a9aff6f

Please sign in to comment.