diff --git a/libbeat/beat/event.go b/libbeat/beat/event.go index 4ef560420396..183b56b1ce88 100644 --- a/libbeat/beat/event.go +++ b/libbeat/beat/event.go @@ -54,6 +54,19 @@ func (e *Event) SetID(id string) { e.Meta["_id"] = id } +func (e *Event) GetMetaStringValue(key string) (string, error) { + tmp, err := e.Meta.GetValue(key) + if err != nil { + return "", err + } + + if s, ok := tmp.(string); ok { + return s, nil + } + + return "", nil +} + func (e *Event) GetValue(key string) (interface{}, error) { if key == "@timestamp" { return e.Timestamp, nil diff --git a/libbeat/esleg/eslegclient/bulkapi.go b/libbeat/esleg/eslegclient/bulkapi.go index 86b518eeea1c..dfc22c310a9e 100644 --- a/libbeat/esleg/eslegclient/bulkapi.go +++ b/libbeat/esleg/eslegclient/bulkapi.go @@ -42,6 +42,10 @@ type BulkCreateAction struct { Create BulkMeta `json:"create" struct:"create"` } +type BulkDeleteAction struct { + Delete BulkMeta `json:"delete" struct:"delete"` +} + type BulkMeta struct { Index string `json:"_index" struct:"_index"` DocType string `json:"_type,omitempty" struct:"_type,omitempty"` diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index bee2769cb9eb..26527c4257b8 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -65,8 +65,16 @@ type bulkResultStats struct { const ( defaultEventType = "doc" + opTypeCreate = "create" + opTypeDelete = "delete" + opTypeIndex = "index" ) +// opTypeKey defines the metadata key name for event operation type. +// The key's value can be an empty string, `create`, `index`, or `delete`. If empty, it will assume +// either `create` or `index`. See `createEventBulkMeta`. If in doubt, set explicitly. +const opTypeKey = "op_type" + // NewClient instantiates a new client. func NewClient( s ClientSettings, @@ -272,7 +280,12 @@ func bulkEncodePublishRequest( log.Errorf("Failed to encode event meta data: %+v", err) continue } - bulkItems = append(bulkItems, meta, event) + if opType, err := event.GetMetaStringValue(opTypeKey); err == nil && opType == opTypeDelete { + // We don't include the event source in a bulk DELETE + bulkItems = append(bulkItems, meta) + } else { + bulkItems = append(bulkItems, meta, event) + } okEvents = append(okEvents, data[i]) } return okEvents, bulkItems @@ -302,16 +315,8 @@ func createEventBulkMeta( return nil, err } - var id string - if m := event.Meta; m != nil { - if tmp := m["_id"]; tmp != nil { - if s, ok := tmp.(string); ok { - id = s - } else { - log.Errorf("Event ID '%v' is no string value", id) - } - } - } + id, _ := event.GetMetaStringValue("_id") + opType, _ := event.GetMetaStringValue(opTypeKey) meta := eslegclient.BulkMeta{ Index: index, @@ -320,7 +325,17 @@ func createEventBulkMeta( ID: id, } + if opType == opTypeDelete { + if id != "" { + return eslegclient.BulkDeleteAction{Delete: meta}, nil + } else { + return nil, fmt.Errorf("%s %s requires _id", opTypeKey, opTypeDelete) + } + } if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) { + if opType == opTypeIndex { + return eslegclient.BulkIndexAction{Index: meta}, nil + } return eslegclient.BulkCreateAction{Create: meta}, nil } return eslegclient.BulkIndexAction{Index: meta}, nil diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index d69849dabab5..fd60b8a1ccdb 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -318,6 +318,68 @@ func TestBulkEncodeEvents(t *testing.T) { } } +func TestBulkEncodeEventsWithOpType(t *testing.T) { + cases := []common.MapStr{ + {"_id": "111", "op_type": "index", "message": "test 1", "bulkIndex": 0}, + {"_id": "112", "op_type": "", "message": "test 2", "bulkIndex": 2}, + {"_id": "", "op_type": "delete", "message": "test 6", "bulkIndex": -1}, // this won't get encoded due to missing _id + {"_id": "", "op_type": "", "message": "test 3", "bulkIndex": 4}, + {"_id": "114", "op_type": "delete", "message": "test 4", "bulkIndex": 6}, + {"_id": "115", "op_type": "index", "message": "test 5", "bulkIndex": 7}, + } + + cfg := common.MustNewConfigFrom(common.MapStr{}) + info := beat.Info{ + IndexPrefix: "test", + Version: version.GetDefaultVersion(), + } + + im, err := idxmgmt.DefaultSupport(nil, info, common.NewConfig()) + require.NoError(t, err) + + index, pipeline, err := buildSelectors(im, info, cfg) + require.NoError(t, err) + + events := make([]publisher.Event, len(cases)) + for i, fields := range cases { + events[i] = publisher.Event{ + Content: beat.Event{ + Meta: common.MapStr{ + "_id": fields["_id"], + "op_type": fields["op_type"], + }, + Fields: common.MapStr{ + "message": fields["message"], + }, + }} + } + + encoded, bulkItems := bulkEncodePublishRequest(logp.L(), *common.MustNewVersion(version.GetDefaultVersion()), index, pipeline, events) + require.Equal(t, len(events)-1, len(encoded), "all events should have been encoded") + require.Equal(t, 9, len(bulkItems), "incomplete bulk") + + for i := 0; i < len(cases); i++ { + bulkEventIndex, _ := cases[i]["bulkIndex"].(int) + if bulkEventIndex == -1 { + continue + } + caseOpType, _ := cases[i]["op_type"].(string) + caseMessage, _ := cases[i]["message"].(string) + switch bulkItems[bulkEventIndex].(type) { + case eslegclient.BulkCreateAction: + validOpTypes := []string{opTypeCreate, ""} + require.Contains(t, validOpTypes, caseOpType, caseMessage) + case eslegclient.BulkIndexAction: + require.Equal(t, opTypeIndex, caseOpType, caseMessage) + case eslegclient.BulkDeleteAction: + require.Equal(t, opTypeDelete, caseOpType, caseMessage) + default: + require.FailNow(t, "unknown type") + } + } + +} + func TestClientWithAPIKey(t *testing.T) { var headers http.Header