Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log monitoring bulk failures #14356

Merged
merged 9 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update azure configuration example. {issue}14224[14224]
- Fix cloudwatch metricset with names and dimensions in config. {issue}14376[14376] {pull}14391[14391]
- Fix marshaling of ms-since-epoch values in `elasticsearch/cluster_stats` metricset. {pull}14378[14378]
- Log bulk failures from bulk API requests to monitoring cluster. {issue}14303[14303] {pull}14356[14356]

*Packetbeat*

Expand Down
31 changes: 30 additions & 1 deletion libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package elasticsearch
import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -229,7 +230,12 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error {
// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
// FIXME: index name (first param below)
_, err = c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:])
result, err := c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:])
if err != nil {
return err
}

logBulkFailures(result, []report.Event{document})
return err
}

Expand All @@ -238,3 +244,26 @@ func getMonitoringIndexName() string {
date := time.Now().Format("2006.01.02")
return fmt.Sprintf(".monitoring-beats-%v-%s", version, date)
}

func logBulkFailures(result esout.BulkResult, events []report.Event) {
reader := esout.NewJSONReader(result)
err := esout.BulkReadToItems(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk items: %v", err)
return
}

for i := range events {
status, msg, err := esout.BulkReadItemStatus(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk item status: %v", err)
return
}
switch {
case status < 300, status == http.StatusConflict:
continue
default:
logp.Warn("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
}
}
}
29 changes: 10 additions & 19 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package elasticsearch

import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
Expand All @@ -34,16 +35,15 @@ type bulkRequest struct {
requ *http.Request
}

type bulkResult struct {
raw []byte
}
// BulkResult contains the result of a bulk API request.
type BulkResult json.RawMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 nice change


// Bulk performs many index/delete operations in a single API call.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
func (conn *Connection) Bulk(
index, docType string,
params map[string]string, body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
return conn.BulkWith(index, docType, params, nil, body)
}

Expand All @@ -56,7 +56,7 @@ func (conn *Connection) BulkWith(
params map[string]string,
metaBuilder MetaBuilder,
body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
if len(body) == 0 {
return nil, nil
}
Expand All @@ -76,7 +76,7 @@ func (conn *Connection) BulkWith(
if err != nil {
return nil, err
}
return readQueryResult(result.raw)
return result, nil
}

// SendMonitoringBulk creates a HTTP request to the X-Pack Monitoring API containing a bunch of
Expand All @@ -85,7 +85,7 @@ func (conn *Connection) BulkWith(
func (conn *Connection) SendMonitoringBulk(
params map[string]string,
body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
if len(body) == 0 {
return nil, nil
}
Expand All @@ -111,7 +111,7 @@ func (conn *Connection) SendMonitoringBulk(
if err != nil {
return nil, err
}
return readQueryResult(result.raw)
return result, nil
}

func newBulkRequest(
Expand Down Expand Up @@ -199,18 +199,9 @@ func (r *bulkRequest) Reset(body bodyEncoder) {
body.AddHeader(&r.requ.Header)
}

func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, bulkResult, error) {
func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResult, error) {
status, resp, err := conn.execHTTPRequest(requ.requ)
if err != nil {
return status, bulkResult{}, err
}

result, err := readBulkResult(resp)
return status, result, err
}

func readBulkResult(obj []byte) (bulkResult, error) {
return bulkResult{obj}, nil
return status, BulkResult(resp), err
}

func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error {
Expand Down
8 changes: 2 additions & 6 deletions libbeat/outputs/elasticsearch/bulkapi_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"net/http"
"os"
Expand All @@ -34,7 +33,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("elasticsearch"))

index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())
expectedResp, _ := json.Marshal(QueryResult{Ok: true, Index: index, Type: "type1", ID: "1", Version: 1, Created: true})
expectedResp := []byte(`{"took":7,"errors":false,"items":[]}`)

ops := []map[string]interface{}{
{
Expand All @@ -61,13 +60,10 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {
params := map[string]string{
"refresh": "true",
}
resp, err := client.Bulk(index, "type1", params, body)
_, err := client.Bulk(index, "type1", params, body)
if err != nil {
t.Errorf("Bulk() returns error: %s", err)
}
if !resp.Created {
t.Errorf("Bulk() fails: %s", resp)
}
}

func TestOneHost500Resp_Bulk(t *testing.T) {
Expand Down
82 changes: 45 additions & 37 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Client struct {
bulkRequ *bulkRequest

// buffered json response reader
json jsonReader
json JSONReader

// additional configs
compressionLevel int
Expand Down Expand Up @@ -128,6 +128,7 @@ var (
)

var (
errExpectedItemsArray = errors.New("expected items array")
errExpectedItemObject = errors.New("expected item response object")
errExpectedStatusCode = errors.New("expected item status code")
errUnexpectedEmptyObject = errors.New("empty object")
Expand Down Expand Up @@ -360,7 +361,7 @@ func (client *Client) publishEvents(
failedEvents = data
stats.fails = len(failedEvents)
} else {
client.json.init(result.raw)
client.json.init(result)
failedEvents, stats = bulkCollectPublishFails(&client.json, data)
}

Expand Down Expand Up @@ -478,46 +479,19 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
// event failed due to some error in the event itself (e.g. does not respect mapping),
// the event will be dropped.
func bulkCollectPublishFails(
reader *jsonReader,
reader *JSONReader,
data []publisher.Event,
) ([]publisher.Event, bulkResultStats) {
if err := reader.expectDict(); err != nil {
logp.Err("Failed to parse bulk response: expected JSON object")
return nil, bulkResultStats{}
}

// find 'items' field in response
for {
kind, name, err := reader.nextFieldName()
if err != nil {
logp.Err("Failed to parse bulk response")
return nil, bulkResultStats{}
}

if kind == dictEnd {
logp.Err("Failed to parse bulk response: no 'items' field in response")
return nil, bulkResultStats{}
}

// found items array -> continue
if bytes.Equal(name, nameItems) {
break
}

reader.ignoreNext()
}

// check items field is an array
if err := reader.expectArray(); err != nil {
logp.Err("Failed to parse bulk response: expected items array")
if err := BulkReadToItems(reader); err != nil {
logp.Err("failed to parse bulk response: %v", err.Error())
return nil, bulkResultStats{}
}

count := len(data)
failed := data[:0]
stats := bulkResultStats{}
for i := 0; i < count; i++ {
status, msg, err := itemStatus(reader)
status, msg, err := BulkReadItemStatus(reader)
if err != nil {
return nil, bulkResultStats{}
}
Expand Down Expand Up @@ -553,9 +527,43 @@ func bulkCollectPublishFails(
return failed, stats
}

func itemStatus(reader *jsonReader) (int, []byte, error) {
// BulkReadToItems reads the bulk response up to (but not including) items
ycombinator marked this conversation as resolved.
Show resolved Hide resolved
func BulkReadToItems(reader *JSONReader) error {
ph marked this conversation as resolved.
Show resolved Hide resolved
if err := reader.ExpectDict(); err != nil {
return errExpectedObject
}

// find 'items' field in response
for {
kind, name, err := reader.nextFieldName()
if err != nil {
return err
}

if kind == dictEnd {
return errExpectedItemsArray
}

// found items array -> continue
if bytes.Equal(name, nameItems) {
break
}

reader.ignoreNext()
}

// check items field is an array
if err := reader.ExpectArray(); err != nil {
return errExpectedItemsArray
}

return nil
}

// BulkReadItemStatus reads the status and error fields from the bulk item
func BulkReadItemStatus(reader *JSONReader) (int, []byte, error) {
ph marked this conversation as resolved.
Show resolved Hide resolved
// skip outer dictionary
if err := reader.expectDict(); err != nil {
if err := reader.ExpectDict(); err != nil {
return 0, nil, errExpectedItemObject
}

Expand Down Expand Up @@ -593,8 +601,8 @@ func itemStatus(reader *jsonReader) (int, []byte, error) {
return status, msg, nil
}

func itemStatusInner(reader *jsonReader) (int, []byte, error) {
if err := reader.expectDict(); err != nil {
func itemStatusInner(reader *JSONReader) (int, []byte, error) {
if err := reader.ExpectDict(); err != nil {
return 0, nil, errExpectedItemObject
}

Expand Down
Loading