Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow delete and index actions with a document ID #12606

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ func (e *Event) SetID(id string) {
e.Meta["_id"] = id
}

func (e *Event) GetMetaStringValue(key string) (string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The function can be simplified to this:

tmp, err := e.GetValue("@metadata." + key)
if err == nil {
    if s, ok := tmp.(string); ok {
	return s, nil
    }
}
return "", nil

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about (requires no alloc):

tmp, err := e.Meta.GetValue(key)
if err != nil {
  return "", err
}

if s, ok := tmp.(string); ok {
  return s, nil
}

return "", nil

All in all I'd prefer to keep the interface of Event as small as possible and provide helpers as functions instead of methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is even better. \o/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. I've used the version from @urso . Nice to see this level of attention to detail!

tmp, err := e.Meta.GetValue(key)
if err != nil {
return "", err
}

if s, ok := tmp.(string); ok {
return s, nil
}

return "", nil
}

func (e *Event) GetValue(key string) (interface{}, error) {
if key == "@timestamp" {
return e.Timestamp, nil
Expand Down
4 changes: 4 additions & 0 deletions libbeat/esleg/eslegclient/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type BulkCreateAction struct {
Create BulkMeta `json:"create" struct:"create"`
}

type BulkDeleteAction struct {
Delete BulkMeta `json:"delete" struct:"delete"`
}

type BulkMeta struct {
Index string `json:"_index" struct:"_index"`
DocType string `json:"_type,omitempty" struct:"_type,omitempty"`
Expand Down
37 changes: 26 additions & 11 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,16 @@ type bulkResultStats struct {

const (
defaultEventType = "doc"
opTypeCreate = "create"
opTypeDelete = "delete"
opTypeIndex = "index"
)

// opTypeKey defines the metadata key name for event operation type.
// The key's value can be an empty string, `create`, `index`, or `delete`. If empty, it will assume
// either `create` or `index`. See `createEventBulkMeta`. If in doubt, set explicitly.
const opTypeKey = "op_type"

// NewClient instantiates a new client.
func NewClient(
s ClientSettings,
Expand Down Expand Up @@ -272,7 +280,12 @@ func bulkEncodePublishRequest(
log.Errorf("Failed to encode event meta data: %+v", err)
continue
}
bulkItems = append(bulkItems, meta, event)
if opType, err := event.GetMetaStringValue(opTypeKey); err == nil && opType == opTypeDelete {
// We don't include the event source in a bulk DELETE
bulkItems = append(bulkItems, meta)
} else {
bulkItems = append(bulkItems, meta, event)
}
okEvents = append(okEvents, data[i])
}
return okEvents, bulkItems
Expand Down Expand Up @@ -302,16 +315,8 @@ func createEventBulkMeta(
return nil, err
}

var id string
if m := event.Meta; m != nil {
if tmp := m["_id"]; tmp != nil {
if s, ok := tmp.(string); ok {
id = s
} else {
log.Errorf("Event ID '%v' is no string value", id)
}
}
}
id, _ := event.GetMetaStringValue("_id")
opType, _ := event.GetMetaStringValue(opTypeKey)

meta := eslegclient.BulkMeta{
Index: index,
Expand All @@ -320,7 +325,17 @@ func createEventBulkMeta(
ID: id,
}

if opType == opTypeDelete {
if id != "" {
return eslegclient.BulkDeleteAction{Delete: meta}, nil
ycombinator marked this conversation as resolved.
Show resolved Hide resolved
} else {
return nil, fmt.Errorf("%s %s requires _id", opTypeKey, opTypeDelete)
}
}
if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) {
if opType == opTypeIndex {
return eslegclient.BulkIndexAction{Index: meta}, nil
}
return eslegclient.BulkCreateAction{Create: meta}, nil
}
return eslegclient.BulkIndexAction{Index: meta}, nil
Expand Down
62 changes: 62 additions & 0 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,68 @@ func TestBulkEncodeEvents(t *testing.T) {
}
}

func TestBulkEncodeEventsWithOpType(t *testing.T) {
cases := []common.MapStr{
{"_id": "111", "op_type": "index", "message": "test 1", "bulkIndex": 0},
{"_id": "112", "op_type": "", "message": "test 2", "bulkIndex": 2},
{"_id": "", "op_type": "delete", "message": "test 6", "bulkIndex": -1}, // this won't get encoded due to missing _id
{"_id": "", "op_type": "", "message": "test 3", "bulkIndex": 4},
{"_id": "114", "op_type": "delete", "message": "test 4", "bulkIndex": 6},
{"_id": "115", "op_type": "index", "message": "test 5", "bulkIndex": 7},
}

cfg := common.MustNewConfigFrom(common.MapStr{})
info := beat.Info{
IndexPrefix: "test",
Version: version.GetDefaultVersion(),
}

im, err := idxmgmt.DefaultSupport(nil, info, common.NewConfig())
require.NoError(t, err)

index, pipeline, err := buildSelectors(im, info, cfg)
require.NoError(t, err)

events := make([]publisher.Event, len(cases))
for i, fields := range cases {
events[i] = publisher.Event{
Content: beat.Event{
Meta: common.MapStr{
"_id": fields["_id"],
"op_type": fields["op_type"],
},
Fields: common.MapStr{
"message": fields["message"],
},
}}
}

encoded, bulkItems := bulkEncodePublishRequest(logp.L(), *common.MustNewVersion(version.GetDefaultVersion()), index, pipeline, events)
require.Equal(t, len(events)-1, len(encoded), "all events should have been encoded")
require.Equal(t, 9, len(bulkItems), "incomplete bulk")

for i := 0; i < len(cases); i++ {
bulkEventIndex, _ := cases[i]["bulkIndex"].(int)
if bulkEventIndex == -1 {
continue
}
caseOpType, _ := cases[i]["op_type"].(string)
caseMessage, _ := cases[i]["message"].(string)
switch bulkItems[bulkEventIndex].(type) {
case eslegclient.BulkCreateAction:
validOpTypes := []string{opTypeCreate, ""}
require.Contains(t, validOpTypes, caseOpType, caseMessage)
case eslegclient.BulkIndexAction:
require.Equal(t, opTypeIndex, caseOpType, caseMessage)
case eslegclient.BulkDeleteAction:
require.Equal(t, opTypeDelete, caseOpType, caseMessage)
default:
require.FailNow(t, "unknown type")
}
}

}

func TestClientWithAPIKey(t *testing.T) {
var headers http.Header

Expand Down