Skip to content

Commit

Permalink
[7.5] Log monitoring bulk failures (#14356) (#14527)
Browse files Browse the repository at this point in the history
* Log monitoring bulk failures (#14356)

* Log monitoring bulk failures

* Renaming function

* Simplifying type

* Removing extraneous second value

* Adding godoc comments

* Adding CHANGELOG entry

* Clarifying log messages

* WIP: adding unit test stubs

* Fleshing out unit tests

* [DOCS] Deprecate central management (#14104) (#14594)

* State minimum Go version (#14400) (#14598)

* [DOCS] Fix description of rename processor (#14408) (#14600)

* Log monitoring bulk failures (#14356)

* Log monitoring bulk failures

* Renaming function

* Simplifying type

* Removing extraneous second value

* Adding godoc comments

* Adding CHANGELOG entry

* Clarifying log messages

* WIP: adding unit test stubs

* Fleshing out unit tests

* Fixing up CHANGELOG
  • Loading branch information
ycombinator authored Nov 19, 2019
1 parent 4e9d1a2 commit c1be1ae
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 110 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- Fix checking tagsFilter using length in cloudwatch metricset. {pull}14525[14525]
- Fixed bug with `elasticsearch/cluster_stats` metricset not recording license expiration date correctly. {issue}14541[14541] {pull}14591[14591]
- Convert indexed ms-since-epoch timestamp fields in `elasticsearch/ml_job` metricset to ints from float64s. {issue}14220[14220] {pull}14222[14222]
- Fix ARN parsing function to work for ELB ARNs. {pull}14316[14316]
- Update azure configuration example. {issue}14224[14224]
- Limit some of the error messages to the logs only {issue}14317[14317] {pull}14327[14327]
- Fix cloudwatch metricset with names and dimensions in config. {issue}14376[14376] {pull}14391[14391]
- Fix marshaling of ms-since-epoch values in `elasticsearch/cluster_stats` metricset. {pull}14378[14378]
- Log bulk failures from bulk API requests to monitoring cluster. {issue}14303[14303] {pull}14356[14356]

*Packetbeat*

Expand Down
31 changes: 30 additions & 1 deletion libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package elasticsearch
import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -229,7 +230,12 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error {
// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
// FIXME: index name (first param below)
_, err = c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:])
result, err := c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:])
if err != nil {
return err
}

logBulkFailures(result, []report.Event{document})
return err
}

Expand All @@ -238,3 +244,26 @@ func getMonitoringIndexName() string {
date := time.Now().Format("2006.01.02")
return fmt.Sprintf(".monitoring-beats-%v-%s", version, date)
}

func logBulkFailures(result esout.BulkResult, events []report.Event) {
reader := esout.NewJSONReader(result)
err := esout.BulkReadToItems(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk items: %v", err)
return
}

for i := range events {
status, msg, err := esout.BulkReadItemStatus(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk item status: %v", err)
return
}
switch {
case status < 300, status == http.StatusConflict:
continue
default:
logp.Warn("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
}
}
}
29 changes: 10 additions & 19 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package elasticsearch

import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
Expand All @@ -34,16 +35,15 @@ type bulkRequest struct {
requ *http.Request
}

type bulkResult struct {
raw []byte
}
// BulkResult contains the result of a bulk API request.
type BulkResult json.RawMessage

// Bulk performs many index/delete operations in a single API call.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
func (conn *Connection) Bulk(
index, docType string,
params map[string]string, body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
return conn.BulkWith(index, docType, params, nil, body)
}

Expand All @@ -56,7 +56,7 @@ func (conn *Connection) BulkWith(
params map[string]string,
metaBuilder MetaBuilder,
body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
if len(body) == 0 {
return nil, nil
}
Expand All @@ -76,7 +76,7 @@ func (conn *Connection) BulkWith(
if err != nil {
return nil, err
}
return readQueryResult(result.raw)
return result, nil
}

// SendMonitoringBulk creates a HTTP request to the X-Pack Monitoring API containing a bunch of
Expand All @@ -85,7 +85,7 @@ func (conn *Connection) BulkWith(
func (conn *Connection) SendMonitoringBulk(
params map[string]string,
body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
if len(body) == 0 {
return nil, nil
}
Expand All @@ -111,7 +111,7 @@ func (conn *Connection) SendMonitoringBulk(
if err != nil {
return nil, err
}
return readQueryResult(result.raw)
return result, nil
}

func newBulkRequest(
Expand Down Expand Up @@ -199,18 +199,9 @@ func (r *bulkRequest) Reset(body bodyEncoder) {
body.AddHeader(&r.requ.Header)
}

func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, bulkResult, error) {
func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResult, error) {
status, resp, err := conn.execHTTPRequest(requ.requ)
if err != nil {
return status, bulkResult{}, err
}

result, err := readBulkResult(resp)
return status, result, err
}

func readBulkResult(obj []byte) (bulkResult, error) {
return bulkResult{obj}, nil
return status, BulkResult(resp), err
}

func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error {
Expand Down
8 changes: 2 additions & 6 deletions libbeat/outputs/elasticsearch/bulkapi_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"net/http"
"os"
Expand All @@ -34,7 +33,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("elasticsearch"))

index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())
expectedResp, _ := json.Marshal(QueryResult{Ok: true, Index: index, Type: "type1", ID: "1", Version: 1, Created: true})
expectedResp := []byte(`{"took":7,"errors":false,"items":[]}`)

ops := []map[string]interface{}{
{
Expand All @@ -61,13 +60,10 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {
params := map[string]string{
"refresh": "true",
}
resp, err := client.Bulk(index, "type1", params, body)
_, err := client.Bulk(index, "type1", params, body)
if err != nil {
t.Errorf("Bulk() returns error: %s", err)
}
if !resp.Created {
t.Errorf("Bulk() fails: %s", resp)
}
}

func TestOneHost500Resp_Bulk(t *testing.T) {
Expand Down
82 changes: 45 additions & 37 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Client struct {
bulkRequ *bulkRequest

// buffered json response reader
json jsonReader
json JSONReader

// additional configs
compressionLevel int
Expand Down Expand Up @@ -125,6 +125,7 @@ var (
)

var (
errExpectedItemsArray = errors.New("expected items array")
errExpectedItemObject = errors.New("expected item response object")
errExpectedStatusCode = errors.New("expected item status code")
errUnexpectedEmptyObject = errors.New("empty object")
Expand Down Expand Up @@ -355,7 +356,7 @@ func (client *Client) publishEvents(
failedEvents = data
stats.fails = len(failedEvents)
} else {
client.json.init(result.raw)
client.json.init(result)
failedEvents, stats = bulkCollectPublishFails(&client.json, data)
}

Expand Down Expand Up @@ -473,46 +474,19 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
// event failed due to some error in the event itself (e.g. does not respect mapping),
// the event will be dropped.
func bulkCollectPublishFails(
reader *jsonReader,
reader *JSONReader,
data []publisher.Event,
) ([]publisher.Event, bulkResultStats) {
if err := reader.expectDict(); err != nil {
logp.Err("Failed to parse bulk response: expected JSON object")
return nil, bulkResultStats{}
}

// find 'items' field in response
for {
kind, name, err := reader.nextFieldName()
if err != nil {
logp.Err("Failed to parse bulk response")
return nil, bulkResultStats{}
}

if kind == dictEnd {
logp.Err("Failed to parse bulk response: no 'items' field in response")
return nil, bulkResultStats{}
}

// found items array -> continue
if bytes.Equal(name, nameItems) {
break
}

reader.ignoreNext()
}

// check items field is an array
if err := reader.expectArray(); err != nil {
logp.Err("Failed to parse bulk response: expected items array")
if err := BulkReadToItems(reader); err != nil {
logp.Err("failed to parse bulk response: %v", err.Error())
return nil, bulkResultStats{}
}

count := len(data)
failed := data[:0]
stats := bulkResultStats{}
for i := 0; i < count; i++ {
status, msg, err := itemStatus(reader)
status, msg, err := BulkReadItemStatus(reader)
if err != nil {
return nil, bulkResultStats{}
}
Expand Down Expand Up @@ -548,9 +522,43 @@ func bulkCollectPublishFails(
return failed, stats
}

func itemStatus(reader *jsonReader) (int, []byte, error) {
// BulkReadToItems reads the bulk response up to (but not including) items
func BulkReadToItems(reader *JSONReader) error {
if err := reader.ExpectDict(); err != nil {
return errExpectedObject
}

// find 'items' field in response
for {
kind, name, err := reader.nextFieldName()
if err != nil {
return err
}

if kind == dictEnd {
return errExpectedItemsArray
}

// found items array -> continue
if bytes.Equal(name, nameItems) {
break
}

reader.ignoreNext()
}

// check items field is an array
if err := reader.ExpectArray(); err != nil {
return errExpectedItemsArray
}

return nil
}

// BulkReadItemStatus reads the status and error fields from the bulk item
func BulkReadItemStatus(reader *JSONReader) (int, []byte, error) {
// skip outer dictionary
if err := reader.expectDict(); err != nil {
if err := reader.ExpectDict(); err != nil {
return 0, nil, errExpectedItemObject
}

Expand Down Expand Up @@ -588,8 +596,8 @@ func itemStatus(reader *jsonReader) (int, []byte, error) {
return status, msg, nil
}

func itemStatusInner(reader *jsonReader) (int, []byte, error) {
if err := reader.expectDict(); err != nil {
func itemStatusInner(reader *JSONReader) (int, []byte, error) {
if err := reader.ExpectDict(); err != nil {
return 0, nil, errExpectedItemObject
}

Expand Down
Loading

0 comments on commit c1be1ae

Please sign in to comment.